[DO NOT MERGE] Add Implementation for Exposing Brokers as Subscribable APIs over Web-friendly Protocols#1944
[DO NOT MERGE] Add Implementation for Exposing Brokers as Subscribable APIs over Web-friendly Protocols#1944senthuran16 wants to merge 23 commits into
Conversation
…ing policy is defined
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughThis pull request implements WebBroker API protocol mediation, enabling bidirectional WebSocket-to-Kafka message transformation via declarative APIs and policies. Changes span the controller management plane (REST endpoints, persistence, xDS translation) and event runtime (WebSocket receiver, policy enforcement, dynamic binding). The feature introduces new binding types, connector interfaces for protocol-agnostic policies, and xDS support for dynamic configuration. Documentation includes architecture specifications, REST API contracts, implementation guides, and operational walkthroughs. Sequence DiagramsequenceDiagram
participant User as API User
participant Ctrl as Gateway Controller
participant DB as SQLite Config
participant xDS as xDS Stream
participant Runtime as Event Runtime
participant Hub as Message Hub
participant WSClient as WebSocket Client
participant Kafka as Kafka
User->>Ctrl: POST /webbroker-apis (WebBrokerApi spec)
Ctrl->>DB: store WebBrokerApi config
Ctrl->>Ctrl: TranslateWebBrokerApisToEventChannelConfigs
Ctrl->>xDS: push EventChannelConfig resource
xDS->>Runtime: EventChannelResource (channels, policies, receiver, broker-driver)
Runtime->>Runtime: build policy chains (ConnInit, Produce, Consume)
Runtime->>Hub: register WebBrokerApi binding
Runtime->>Runtime: create Kafka broker-driver
Runtime->>Runtime: instantiate WebBrokerApi receiver (WebSocket)
WSClient->>Runtime: WebSocket upgrade + X-channel header
Runtime->>Hub: ProcessConnectionInitRequest
Hub->>Hub: execute on_connection_init.request policies
Hub-->>Runtime: validated, topic/chain config
alt Policy Short-circuits
Runtime-->>WSClient: 403 Forbidden
else Policies Accept
Runtime->>Kafka: subscribe to consume topics
Runtime->>Hub: ProcessConnectionInitResponse
Hub-->>Runtime: response headers applied
loop Bidirectional Flow
WSClient->>Runtime: produce message
Runtime->>Hub: ProcessProduce(chainKey)
Hub-->>Runtime: transformed message + target topic
Runtime->>Kafka: publish(produceTopic, message)
Kafka->>Runtime: message from consumeTopics
Runtime->>Hub: ProcessConsume(chainKey)
Hub-->>Runtime: transformed message
Runtime->>WSClient: WebSocket binary frame
end
end
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 inconclusive)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Dependency Validation ResultsDependency name: golang.org/x/crypto Dependency name: golang.org/x/mod Dependency name: github.com/gorilla/websocket Dependency name: github.com/wso2/api-platform/sdk/core Dependency name: golang.org/x/mod Dependency name: github.com/gorilla/websocket Dependency name: github.com/knadh/koanf/providers/file Dependency name: github.com/gorilla/websocket Dependency name: github.com/gorilla/websocket Dependency name: github.com/mattn/go-sqlite3 Next Steps
|
There was a problem hiding this comment.
Actionable comments posted: 10
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
event-gateway/gateway-runtime/internal/runtime/runtime.go (1)
445-490:⚠️ Potential issue | 🟠 Major | ⚡ Quick winRisk of binding twice to the same ports when static + xDS are both configured.
When
ControlPlane.Enabledis true,Rununconditionally creates and appends a WebSocket server (and WebSub HTTP/HTTPS servers) bound to the configured ports. IfLoadChannelsalready added matching servers at Lines 406-428 because static bindings were present, both will callListenAndServeon the same port and one will fail withaddress already in use.Consider gating these creations on whether servers for that port/role already exist in
r.servers, or making xDS mode mutually exclusive with static channel binding loading.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@event-gateway/gateway-runtime/internal/runtime/runtime.go` around lines 445 - 490, The current Run block unconditionally creates WebSocket/WebSub servers when r.cfg.ControlPlane.Enabled is true, which can duplicate servers already added by LoadChannels; modify Run to first check r.servers for an existing managed server matching the intended role/port before calling newManagedServer and appending/running it. Specifically, in the ControlPlaneEnabled branch around the WebSocket/WebSub creation, use a lookup (by port and role/name) against r.servers (the slice populated by LoadChannels) and only call newManagedServer("WebSocket", ...), newManagedServer("WebSub-HTTP", ...), or newManagedServer("WebSub-HTTPS", ...) and start r.runServer if no existing server is found; keep existing error handling and locking semantics.
🧹 Nitpick comments (6)
gateway/gateway-controller/pkg/utils/api_deployment.go (1)
657-746: 💤 Low valueUpdate documentation and consider extracting vhost resolution logic to reduce duplication.
The function-level doc comment (lines 653–656) and the
APIDeploymentParams.Kindcomment (line 47) still reference onlyRestAPIandWebSubAPI, but the code now handlesWebBrokerApias well. Update both to include the third kind.Additionally, the three
caseblocks inresolveVhostSentinelsare nearly identical—they differ only in the type assertion—and duplicate ~60 lines of logic. Extracting the vhost resolution logic into a helper function that operates on the common anonymousVhostsstruct would eliminate this repetition and make future vhost-field changes or additional kinds a single-line addition.♻️ Possible helper extraction sketch
// resolveVhostsInPlace fills or resolves a vhosts pointer using the gateway-default // sentinel rules, returning the (possibly new) pointer. func resolveVhostsInPlace( vhosts *struct { Main string `json:"main" yaml:"main"` Sandbox *string `json:"sandbox,omitempty" yaml:"sandbox,omitempty"` }, routerCfg *config.RouterConfig, ) *struct { Main string `json:"main" yaml:"main"` Sandbox *string `json:"sandbox,omitempty" yaml:"sandbox,omitempty"` } { if vhosts == nil { out := &struct { Main string `json:"main" yaml:"main"` Sandbox *string `json:"sandbox,omitempty" yaml:"sandbox,omitempty"` }{Main: routerCfg.VHosts.Main.Default} if sb := routerCfg.VHosts.Sandbox.Default; sb != "" { out.Sandbox = &sb } return out } if vhosts.Main == constants.VHostGatewayDefault { vhosts.Main = routerCfg.VHosts.Main.Default } if vhosts.Sandbox != nil && *vhosts.Sandbox == constants.VHostGatewayDefault { if resolved := routerCfg.VHosts.Sandbox.Default; resolved != "" { vhosts.Sandbox = &resolved } else { vhosts.Sandbox = nil } } return vhosts }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@gateway/gateway-controller/pkg/utils/api_deployment.go` around lines 657 - 746, The comments mentioning supported kinds are out of date: update the function-level doc for resolveVhostSentinels and the APIDeploymentParams.Kind comment to include WebBrokerApi; then remove the duplicated vhost-handling blocks by extracting the common logic into a helper (e.g., resolveVhostsInPlace) that accepts/returns the anonymous VHosts struct and routerCfg, and call that helper from each type assertion branch (api.RestAPI, api.WebSubAPI, api.WebBrokerApi) to set c.Spec.Vhosts and *cfg accordingly.event-gateway/gateway-runtime/internal/hub/hub.go (2)
533-562: ⚖️ Poor tradeoffChain key reuse may cause confusion.
Line 543 reuses
SubscribeChainKeyto storeon_connection_init.requestpolicies. This overloads the semantic meaning ofSubscribeChainKeyand conflates two distinct lifecycle events (subscription vs. connection initialization). If a WebBroker API needs different policies for WebSocket handshake vs. topic subscription, this reuse creates ambiguity in configuration and debugging.Consider adding a dedicated
ConnectionInitRequestChainKeyfield toChannelBindingto clarify intent and support independent policy configuration.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@event-gateway/gateway-runtime/internal/hub/hub.go` around lines 533 - 562, The code incorrectly reuses ChannelBinding.SubscribeChainKey inside ProcessConnectionInitRequest to run on_connection_init.request policies, which conflates subscription and connection-init lifecycle policies; add a new field on ChannelBinding (e.g., ConnectionInitRequestChainKey) and update ProcessConnectionInitRequest to use h.engine.GetChain(binding.ConnectionInitRequestChainKey) and ExecuteRequestHeaderPolicies with that key (falling back to SubscribeChainKey only if desired), adjust MessageToRequestHeaderContext/Application calls to reference the new key where appropriate, and ensure logging (e.g., logShortCircuit) and error messages include the new chain key symbol for clarity.
609-660: 💤 Low valueSilent no-op when chain is missing may hinder debugging.
Lines 626-628 silently return
(msg, false, nil)when the chain key is registered buth.engine.GetChain(chainKey)returns nil. The comment "this might be OK if the chain has no policies" suggests this is intentional, but in practice a registered chain key that resolves to nil often indicates a configuration error (e.g., chain not loaded, xDS sync failure, typo in chain key).Emitting a debug or warning log when a non-empty chain key resolves to nil would aid troubleshooting without failing the request:
📋 Suggested logging enhancement
chain := h.engine.GetChain(chainKey) if chain == nil { - // Chain key not found - this might be OK if the chain has no policies + slog.Debug("Chain key not found in policy engine; skipping policy execution", + "binding", bindingName, + "chain_key", chainKey) return msg, false, nil }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@event-gateway/gateway-runtime/internal/hub/hub.go` around lines 609 - 660, ProcessByChainKey currently silently returns when h.engine.GetChain(chainKey) yields nil, which hides configuration issues; update the nil branch after calling h.engine.GetChain(chainKey) to emit a warning/debug log that includes the chainKey and bindingName (e.g., use the Hub logger like h.logger.Warnf or equivalent) indicating the chain resolved to nil, then preserve the existing behavior of returning (msg, false, nil). Reference: ProcessByChainKey and h.engine.GetChain(chainKey).event-gateway/gateway-runtime/internal/xdsclient/handler.go (2)
240-327: 🏗️ Heavy liftRepeated
map[string]interface{}traversal is brittle and hard to maintain.
toWebSubApiBindingandtoWebBrokerApiBindingreimplement JSON decoding by hand. Since you already havedata(the raw JSON bytes) just beforejson.Unmarshal(data, &ecr)at Line 185, a cleaner approach is to keepecr.Channels/ecr.Policiesasjson.RawMessageand decode them into the kind-specific structs ([]ChannelEntry/PoliciesEntryormap[string]WebBrokerChannelEntry/ProtocolMediationPolicies) onceKindis known. That eliminates the manual type-assertion ladder, reduces panic surface on unexpected shapes, and keeps the typed structs (Lines 73-95) authoritative.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@event-gateway/gateway-runtime/internal/xdsclient/handler.go` around lines 240 - 327, The toWebSubApiBinding function is manually traversing ecr.Channels and ecr.Policies as map[string]interface{} which is brittle; instead change the EventChannelResource struct so Channels and Policies are json.RawMessage (or keep raw data from earlier) and, once Kind is known, unmarshal ecr.Channels into []ChannelEntry and ecr.Policies into PoliciesEntry (or the appropriate typed structs) before calling toWebSubApiBinding; update toWebSubApiBinding to remove the type-assertion ladders and use the decoded []ChannelEntry/PoliciesEntry directly (referencing toWebSubApiBinding, EventChannelResource, ChannelEntry, and PoliciesEntry to locate the changes).
66-95: ⚡ Quick winUnused struct definitions for nested decoding.
ProtocolMediationPoliciesandConnectionInitPoliciesare declared but never used —toWebBrokerApiBindingdecodes the same shapes manually frommap[string]interface{}(Lines 422-439). Additionally, the doc comments above describeon_connection_initas aConnectionInitPolicieswithrequest/response, while the runtime decoder at Line 426 treats it as a flat[]interface{}array, making the type description misleading.Consider either driving the decoding through these structs (via
json.Unmarshalon the inner payload) or removing the unused types and aligning the doc comment with the flat-array shape actually consumed.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@event-gateway/gateway-runtime/internal/xdsclient/handler.go` around lines 66 - 95, The declared types ProtocolMediationPolicies and ConnectionInitPolicies are unused and their doc comment for on_connection_init mismatches the runtime decoder in toWebBrokerApiBinding; fix by either (A) using these structs to decode the inner payloads instead of manual map[string]interface{} parsing — e.g. in toWebBrokerApiBinding unmarshal the channels' "policies" and "on_connection_init" JSON into ProtocolMediationPolicies/ConnectionInitPolicies so the types are actually exercised, or (B) remove ProtocolMediationPolicies and ConnectionInitPolicies and update the comments to reflect the flat []interface{} shape the decoder expects; update toWebBrokerApiBinding accordingly and keep symbols ProtocolMediationPolicies, ConnectionInitPolicies, WebBrokerChannelEntry, and toWebBrokerApiBinding in sync.event-gateway/gateway-runtime/internal/runtime/runtime.go (1)
814-874: 💤 Low valueTwo helpers diverge subtly; consider consolidating.
extractTopicsFromChannelPolicies(consume only) andextractAllTopicsFromChannelPolicies(produce + consume) duplicate the same defaulting logic. The latter setshasConsumeTopicsbut always falls back to the normalized channel name in both branches at Lines 858-866, so the two flags currently produce identical output for the produce/consume defaults. A single helper returning a{produce, consume []string}struct would remove the duplication and the easy-to-miss double assignment.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@event-gateway/gateway-runtime/internal/runtime/runtime.go` around lines 814 - 874, Both helpers duplicate defaulting/dedup logic; replace extractTopicsFromChannelPolicies and extractAllTopicsFromChannelPolicies with a single helper (e.g., extractProduceConsumeTopics) that returns a struct or two slices {Produce []string, Consume []string}; implement the logic once: collect/uniq produce topics from channelDef.ProduceTo (fall back to binding.NormalizeTopicSegment(channelName) when none), collect/uniq consume topics from channelDef.ConsumeFrom (fall back to binding.NormalizeTopicSegment(channelName) when none), and return de-duplicated slices for each; update all call sites to use the new helper and remove the old functions.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@docs/rest-apis/gateway/schemas.md`:
- Around line 1528-1529: The docs currently allow a WebBroker channel with
neither produceTo nor consumeFrom; update the OpenAPI schema for the WebBroker
channel to require at least one mapping by adding an anyOf (or oneOf) clause
that enforces required: ["produceTo"] OR required: ["consumeFrom"], and then
update the markdown docs (the table rows for produceTo and consumeFrom) to
mention the new rule (e.g., "At least one of produceTo or consumeFrom is
required"). Reference the produceTo and consumeFrom properties and the WebBroker
channel schema when making the change so the generated docs and source OpenAPI
reflect the constraint.
- Around line 774-776: The examples for WebSubAPIRequest and WebSubAPI must be
updated to match the new WebhookAPIData contract: replace array-based channels
with an object keyed by channel name (matching the documented property name
channels) and change per-channel policy definitions to use event-scoped policies
(as defined by WebSubAllChannelPolicies and WebSubChannel) instead of the old
array entries with method; update any example payloads and descriptions to show
a channel name key mapped to a WebSubChannel object containing policies
organized by event type and any per-channel overrides under additionalProperties
semantics so examples align with WebhookAPIData, WebSubChannel,
WebSubAllChannelPolicies, channels and policies.
In `@docs/rest-apis/gateway/websub-api-management.md`:
- Around line 512-516: The example payload for the "Update API key" endpoint
doesn't match the documented schema APIKeyUpdateRequest: the example shows a
"name" property while the schema defines no request properties; fix by either
adding a "name" string property (and description) to the APIKeyUpdateRequest
schema so the example is valid, or change the example JSON to an empty object
(or whatever the schema expects) and update the example caption to reflect no
body required—ensure you modify the APIKeyUpdateRequest schema and the example
block consistently so clients see a single authoritative contract.
In `@event-gateway/gateway-runtime/internal/connectors/types.go`:
- Around line 52-60: Add a brief comment above the ProcessConnectionInitResponse
method in the interface explaining that, unlike
ProcessConnectionInitRequest/ProcessProduce/ProcessConsume which return
(*Message, bool, error) and support policy chain short-circuiting,
ProcessConnectionInitResponse intentionally does not support short-circuiting
and therefore omits the bool; note that response policies always run to
completion and implementations must not rely on a ShortCircuited flag for this
method.
In `@event-gateway/gateway-runtime/internal/hub/hub.go`:
- Around line 564-591: ProcessConnectionInitResponse is missing a check for
policy short-circuiting after executing request header policies for
binding.OutboundChainKey: after calling h.engine.ExecuteRequestHeaderPolicies
(and before ApplyRequestHeaderResult) inspect result.ShortCircuited and handle
it consistently with other flows (return early). Update
ProcessConnectionInitResponse to treat a short-circuited result the same way as
other methods — either return an appropriate error (to reject the handshake) or
change the function signature if the caller must distinguish short-circuit
success vs. failure; ensure you reference the result returned by
ExecuteRequestHeaderPolicies and use result.ShortCircuited to decide the early
return.
In `@event-gateway/gateway-runtime/internal/runtime/runtime.go`:
- Around line 1123-1269: AddWebBrokerApiBinding computes allTopics but never
stores them into r.bindingTopics[wbb.Name], and RemoveWebBrokerApiBinding does
not delete those Kafka topics on teardown; update AddWebBrokerApiBinding (after
building ch/Topics metadata and before releasing the lock) to set
r.bindingTopics[wbb.Name] = allTopics, and update RemoveWebBrokerApiBinding to
call the corresponding broker driver DeleteTopics (or r.registry-created
brokerDriver.DeleteTopics/bd.DeleteTopics with r.bindingTopics[name]) during
removal cleanup (or add a clear comment if topics are intentionally retained) so
WebBrokerApi has the same topic lifecycle as
AddWebSubApiBinding/RemoveWebSubApiBinding.
- Around line 294-403: Channel-level policy chain keys built by
buildWebBrokerApiPolicyChains are placed only into Metadata["channelChains"] but
not recorded on the hub ChannelBinding, so unregisterBindingChains cannot clean
them up; update the binding registration to populate a ChannelChainKeys (or
similar) field on hub.ChannelBinding with per-channel
ConnInitReq/ConnInitResp/Produce/Consume keys (derive from channelChains or
channelChainsToMap) so unregisterBindingChains can remove them. Also unify
BrokerDriver config resolution: instead of directly using
wbb.BrokerDriver.Config with a fallback to Properties here (where you call
r.registry.CreateBrokerDriver), reuse the same broker-driver config resolution
helper used by AddWebBrokerApiBinding (the existing code path that reads
wbb.BrokerDriver.Config) so both static and dynamic load paths resolve
Config/Properties identically before calling r.registry.CreateBrokerDriver.
- Around line 131-136: The constructor-created r.wsMux (and r.websubMux) is
being shadowed in LoadChannels by local wsMux and websubMux variables; replace
the local allocations in LoadChannels with reuse of the existing r.wsMux and
r.websubMux so bindings register on the same mux instance the New constructor
created (and which Run later wires into the WebSocket server), ensuring
AddWebBrokerApiBinding and other registration calls always target
r.wsMux/r.websubMux rather than a locally scoped http.NewServeMux().
In `@event-gateway/gateway-runtime/internal/xdsclient/handler.go`:
- Around line 182-194: The logs in handler.go are emitting sensitive broker
configuration: replace the current slog.Info calls that log the raw JSON and
ecr.BrokerDriver with non-production-safe output — either lower them to
slog.Debug and gate behind debug-level logging, or remove them; specifically
stop logging the entire raw JSON and the ecr.BrokerDriver.Properties field
(redact or omit Properties) when printing EventChannelResource (type
EventChannelResource and variable ecr), and if you keep a debug log create a
small sanitized summary (UUID, Name, Kind, broker driver type only) rather than
the full BrokerDriver object.
In `@gateway/gateway-controller/pkg/utils/api_deployment.go`:
- Around line 311-319: This branch skips validation for api.WebBrokerApi (case
api.WebBrokerApi) which leaves downstream calls like validateArtifactConflicts
and the store/render flow operating on unvalidated specs; create a tracking
issue for adding WebBrokerApi support to config.Validator and reference that
issue ID in the TODO comment, and make the unsupported-kind path in
s.validator.Validate defensive (ensure validator.Validate returns a clear
validation error or empty-but-explicit result for unknown kinds so the caller
(the switch handling api.WebBrokerApi) can either log and return a
ValidationErrorListError via s.logValidationErrors/ValidationErrorListError or
otherwise abort processing instead of proceeding silently).
---
Outside diff comments:
In `@event-gateway/gateway-runtime/internal/runtime/runtime.go`:
- Around line 445-490: The current Run block unconditionally creates
WebSocket/WebSub servers when r.cfg.ControlPlane.Enabled is true, which can
duplicate servers already added by LoadChannels; modify Run to first check
r.servers for an existing managed server matching the intended role/port before
calling newManagedServer and appending/running it. Specifically, in the
ControlPlaneEnabled branch around the WebSocket/WebSub creation, use a lookup
(by port and role/name) against r.servers (the slice populated by LoadChannels)
and only call newManagedServer("WebSocket", ...),
newManagedServer("WebSub-HTTP", ...), or newManagedServer("WebSub-HTTPS", ...)
and start r.runServer if no existing server is found; keep existing error
handling and locking semantics.
---
Nitpick comments:
In `@event-gateway/gateway-runtime/internal/hub/hub.go`:
- Around line 533-562: The code incorrectly reuses
ChannelBinding.SubscribeChainKey inside ProcessConnectionInitRequest to run
on_connection_init.request policies, which conflates subscription and
connection-init lifecycle policies; add a new field on ChannelBinding (e.g.,
ConnectionInitRequestChainKey) and update ProcessConnectionInitRequest to use
h.engine.GetChain(binding.ConnectionInitRequestChainKey) and
ExecuteRequestHeaderPolicies with that key (falling back to SubscribeChainKey
only if desired), adjust MessageToRequestHeaderContext/Application calls to
reference the new key where appropriate, and ensure logging (e.g.,
logShortCircuit) and error messages include the new chain key symbol for
clarity.
- Around line 609-660: ProcessByChainKey currently silently returns when
h.engine.GetChain(chainKey) yields nil, which hides configuration issues; update
the nil branch after calling h.engine.GetChain(chainKey) to emit a warning/debug
log that includes the chainKey and bindingName (e.g., use the Hub logger like
h.logger.Warnf or equivalent) indicating the chain resolved to nil, then
preserve the existing behavior of returning (msg, false, nil). Reference:
ProcessByChainKey and h.engine.GetChain(chainKey).
In `@event-gateway/gateway-runtime/internal/runtime/runtime.go`:
- Around line 814-874: Both helpers duplicate defaulting/dedup logic; replace
extractTopicsFromChannelPolicies and extractAllTopicsFromChannelPolicies with a
single helper (e.g., extractProduceConsumeTopics) that returns a struct or two
slices {Produce []string, Consume []string}; implement the logic once:
collect/uniq produce topics from channelDef.ProduceTo (fall back to
binding.NormalizeTopicSegment(channelName) when none), collect/uniq consume
topics from channelDef.ConsumeFrom (fall back to
binding.NormalizeTopicSegment(channelName) when none), and return de-duplicated
slices for each; update all call sites to use the new helper and remove the old
functions.
In `@event-gateway/gateway-runtime/internal/xdsclient/handler.go`:
- Around line 240-327: The toWebSubApiBinding function is manually traversing
ecr.Channels and ecr.Policies as map[string]interface{} which is brittle;
instead change the EventChannelResource struct so Channels and Policies are
json.RawMessage (or keep raw data from earlier) and, once Kind is known,
unmarshal ecr.Channels into []ChannelEntry and ecr.Policies into PoliciesEntry
(or the appropriate typed structs) before calling toWebSubApiBinding; update
toWebSubApiBinding to remove the type-assertion ladders and use the decoded
[]ChannelEntry/PoliciesEntry directly (referencing toWebSubApiBinding,
EventChannelResource, ChannelEntry, and PoliciesEntry to locate the changes).
- Around line 66-95: The declared types ProtocolMediationPolicies and
ConnectionInitPolicies are unused and their doc comment for on_connection_init
mismatches the runtime decoder in toWebBrokerApiBinding; fix by either (A) using
these structs to decode the inner payloads instead of manual
map[string]interface{} parsing — e.g. in toWebBrokerApiBinding unmarshal the
channels' "policies" and "on_connection_init" JSON into
ProtocolMediationPolicies/ConnectionInitPolicies so the types are actually
exercised, or (B) remove ProtocolMediationPolicies and ConnectionInitPolicies
and update the comments to reflect the flat []interface{} shape the decoder
expects; update toWebBrokerApiBinding accordingly and keep symbols
ProtocolMediationPolicies, ConnectionInitPolicies, WebBrokerChannelEntry, and
toWebBrokerApiBinding in sync.
In `@gateway/gateway-controller/pkg/utils/api_deployment.go`:
- Around line 657-746: The comments mentioning supported kinds are out of date:
update the function-level doc for resolveVhostSentinels and the
APIDeploymentParams.Kind comment to include WebBrokerApi; then remove the
duplicated vhost-handling blocks by extracting the common logic into a helper
(e.g., resolveVhostsInPlace) that accepts/returns the anonymous VHosts struct
and routerCfg, and call that helper from each type assertion branch
(api.RestAPI, api.WebSubAPI, api.WebBrokerApi) to set c.Spec.Vhosts and *cfg
accordingly.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: b29a95a6-f3e4-49ea-9786-692ea01fe929
⛔ Files ignored due to path filters (1)
go.workis excluded by!**/*.work
📒 Files selected for processing (19)
docs/rest-apis/gateway/schemas.mddocs/rest-apis/gateway/webbroker-api-management.mddocs/rest-apis/gateway/websub-api-management.mdevent-gateway/gateway-runtime/cmd/event-gateway/plugins.goevent-gateway/gateway-runtime/configs/channels-webbrokerapi-example.yamlevent-gateway/gateway-runtime/configs/channels.yamlevent-gateway/gateway-runtime/configs/config.tomlevent-gateway/gateway-runtime/internal/binding/types.goevent-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/consumer.goevent-gateway/gateway-runtime/internal/connectors/receiver/websocket/broker_api_connector.goevent-gateway/gateway-runtime/internal/connectors/types.goevent-gateway/gateway-runtime/internal/hub/hub.goevent-gateway/gateway-runtime/internal/runtime/runtime.goevent-gateway/gateway-runtime/internal/xdsclient/handler.gogateway/gateway-controller/api/management-openapi.yamlgateway/gateway-controller/pkg/api/management/generated.gogateway/gateway-controller/pkg/policyxds/event_channel_translator.gogateway/gateway-controller/pkg/storage/sql_store.gogateway/gateway-controller/pkg/utils/api_deployment.go
✅ Files skipped from review due to trivial changes (2)
- docs/rest-apis/gateway/webbroker-api-management.md
- gateway/gateway-controller/pkg/api/management/generated.go
🚧 Files skipped from review as they are similar to previous changes (10)
- event-gateway/gateway-runtime/cmd/event-gateway/plugins.go
- event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/consumer.go
- event-gateway/gateway-runtime/configs/channels-webbrokerapi-example.yaml
- event-gateway/gateway-runtime/internal/binding/types.go
- gateway/gateway-controller/pkg/storage/sql_store.go
- event-gateway/gateway-runtime/configs/config.toml
- event-gateway/gateway-runtime/configs/channels.yaml
- gateway/gateway-controller/pkg/policyxds/event_channel_translator.go
- gateway/gateway-controller/api/management-openapi.yaml
- event-gateway/gateway-runtime/internal/connectors/receiver/websocket/broker_api_connector.go
| // ProcessConnectionInitResponse applies on_connection_init.response policies during connection handshake. | ||
| // Used by protocol mediation (WebBrokerApi) for response customization during handshake. | ||
| // Returns the (possibly mutated) message. | ||
| func (h *Hub) ProcessConnectionInitResponse(ctx context.Context, bindingName string, msg *connectors.Message) (*connectors.Message, error) { | ||
| binding := h.GetBinding(bindingName) | ||
| if binding == nil { | ||
| return nil, fmt.Errorf("binding not found: %s", bindingName) | ||
| } | ||
|
|
||
| // Apply connection_init response policies if present. | ||
| // Currently using OutboundChainKey for on_connection_init.response | ||
| // In the future, we could add a dedicated field to ChannelBinding if needed. | ||
| if binding.OutboundChainKey != "" { | ||
| chain := h.engine.GetChain(binding.OutboundChainKey) | ||
| if chain != nil { | ||
| reqHeaderCtx := MessageToRequestHeaderContext(msg, binding) | ||
| result, err := h.engine.ExecuteRequestHeaderPolicies(ctx, binding.OutboundChainKey, reqHeaderCtx.SharedContext, reqHeaderCtx) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("connection_init response policy execution failed: %w", err) | ||
| } | ||
| if err := ApplyRequestHeaderResult(result, msg); err != nil { | ||
| return nil, fmt.Errorf("failed to apply connection_init response result: %w", err) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| return msg, nil | ||
| } |
There was a problem hiding this comment.
Missing short-circuit check in response policy execution.
Lines 580-586 execute header policies but do not check result.ShortCircuited. Every other policy execution path in this file (e.g., ProcessSubscribe, ProcessInbound, ProcessByChainKey) checks the short-circuit flag and returns early if set. The omission here is inconsistent and may prevent policies from rejecting connection handshakes during the response phase.
If connection-init response policies intentionally cannot short-circuit, document this constraint. Otherwise, add the short-circuit check:
🔒 Proposed fix to add short-circuit handling
result, err := h.engine.ExecuteRequestHeaderPolicies(ctx, binding.OutboundChainKey, reqHeaderCtx.SharedContext, reqHeaderCtx)
if err != nil {
return nil, fmt.Errorf("connection_init response policy execution failed: %w", err)
}
+ if result.ShortCircuited {
+ logShortCircuit("Connection init response short-circuited", bindingName, binding.OutboundChainKey, result.ImmediateResponse)
+ // Return error or handle short-circuit appropriately for response phase
+ return nil, fmt.Errorf("connection init response short-circuited by policy")
+ }
if err := ApplyRequestHeaderResult(result, msg); err != nil {Note: The method signature returns (*Message, error) not (*Message, bool, error), so the caller cannot detect short-circuit via a bool. You may need to adjust the return signature or handle short-circuit as an error at this layer.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@event-gateway/gateway-runtime/internal/hub/hub.go` around lines 564 - 591,
ProcessConnectionInitResponse is missing a check for policy short-circuiting
after executing request header policies for binding.OutboundChainKey: after
calling h.engine.ExecuteRequestHeaderPolicies (and before
ApplyRequestHeaderResult) inspect result.ShortCircuited and handle it
consistently with other flows (return early). Update
ProcessConnectionInitResponse to treat a short-circuited result the same way as
other methods — either return an appropriate error (to reject the handshake) or
change the function signature if the caller must distinguish short-circuit
success vs. failure; ensure you reference the result returned by
ExecuteRequestHeaderPolicies and use result.ShortCircuited to decide the early
return.
There was a problem hiding this comment.
Will check this and consider this as a future improvement
There was a problem hiding this comment.
@senthuran16, understood. Would you like me to open a follow-up GitHub issue to track the missing short-circuit check in ProcessConnectionInitResponse so it isn't lost?
🧠 Learnings used
Learnt from: Krishanx92
Repo: wso2/api-platform PR: 1496
File: gateway/gateway-controller/pkg/policyxds/combined_cache.go:559-559
Timestamp: 2026-03-26T09:58:10.495Z
Learning: When reviewing Go code in this repo, usages of `cache.NewTestRawResponse` from `github.com/envoyproxy/go-control-plane/pkg/cache/v3` (Go-Control-Plane v0.14.0+) are valid for production use. Do not flag it as a missing/non-existent function or as “test-only”; the function is publicly exported with signature `func NewTestRawResponse(req *discovery.DiscoveryRequest, version string, resources []types.ResourceWithTTL) *RawResponse`.
| wsMux := http.NewServeMux() | ||
| websubMux := http.NewServeMux() | ||
|
|
||
| // Store wsMux for dynamic bindings | ||
| r.wsMux = wsMux | ||
|
|
There was a problem hiding this comment.
Constructor-initialized wsMux is discarded.
New already constructs r.wsMux = http.NewServeMux() (Line 108), but LoadChannels allocates a fresh wsMux locally at Line 131 and reassigns r.wsMux at Line 135. If AddWebBrokerApiBinding is called before LoadChannels runs (or LoadChannels is skipped in pure-xDS mode), receivers will register on the original mux while Run() later wires a different mux into the WebSocket server — or vice versa.
Recommend reusing r.wsMux instead of allocating a new one (and the same for the WebSub mux, which is currently shadowed by a local http.NewServeMux()).
🔧 Suggested change
- // Create shared HTTP muxes for port sharing.
- wsMux := http.NewServeMux()
- websubMux := http.NewServeMux()
-
- // Store wsMux for dynamic bindings
- r.wsMux = wsMux
+ // Reuse the muxes created in New() so static and dynamic bindings
+ // share the same routing table.
+ wsMux := r.wsMux
+ websubMux := http.NewServeMux() // TODO: align with r.websubMux (DynamicMux) for dynamic bindings📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| wsMux := http.NewServeMux() | |
| websubMux := http.NewServeMux() | |
| // Store wsMux for dynamic bindings | |
| r.wsMux = wsMux | |
| // Reuse the muxes created in New() so static and dynamic bindings | |
| // share the same routing table. | |
| wsMux := r.wsMux | |
| websubMux := http.NewServeMux() // TODO: align with r.websubMux (DynamicMux) for dynamic bindings | |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@event-gateway/gateway-runtime/internal/runtime/runtime.go` around lines 131 -
136, The constructor-created r.wsMux (and r.websubMux) is being shadowed in
LoadChannels by local wsMux and websubMux variables; replace the local
allocations in LoadChannels with reuse of the existing r.wsMux and r.websubMux
so bindings register on the same mux instance the New constructor created (and
which Run later wires into the WebSocket server), ensuring
AddWebBrokerApiBinding and other registration calls always target
r.wsMux/r.websubMux rather than a locally scoped http.NewServeMux().
There was a problem hiding this comment.
Will check this and consider this as a future improvement
There was a problem hiding this comment.
@senthuran16, understood. Would you like me to open a follow-up GitHub issue to track the mux reuse alignment (r.wsMux/r.websubMux in LoadChannels) so it isn't lost?
🧠 Learnings used
Learnt from: Krishanx92
Repo: wso2/api-platform PR: 1496
File: gateway/gateway-controller/pkg/policyxds/combined_cache.go:559-559
Timestamp: 2026-03-26T09:58:10.495Z
Learning: When reviewing Go code in this repo, usages of `cache.NewTestRawResponse` from `github.com/envoyproxy/go-control-plane/pkg/cache/v3` (Go-Control-Plane v0.14.0+) are valid for production use. Do not flag it as a missing/non-existent function or as “test-only”; the function is publicly exported with signature `func NewTestRawResponse(req *discovery.DiscoveryRequest, version string, resources []types.ResourceWithTTL) *RawResponse`.
| // Process WebBrokerApi bindings (protocol mediation). | ||
| for _, wbb := range parseResult.WebBrokerApiBindings { | ||
| vhost := defaultVhost(wbb.Vhost) | ||
|
|
||
| // Build API-level policy chains. | ||
| apiConnInitReqKey, _, _, _, err := r.buildWebBrokerApiPolicyChains(wbb, vhost, "") | ||
| if err != nil { | ||
| return fmt.Errorf("failed to build API-level chains for WebBrokerApi %q: %w", wbb.Name, err) | ||
| } | ||
|
|
||
| // Build per-channel policy chains and collect topics. | ||
| channelChains := make(map[string]ChannelPolicyChains) | ||
| allTopics := []string{} // All topics (produce + consume) for ensuring they exist | ||
| topicToChannel := make(map[string]string) // Only consume topics for subscription mapping | ||
|
|
||
| for channelName, channelDef := range wbb.Channels { | ||
| connInitReqKey, connInitRespKey, produceKey, consumeKey, err := r.buildWebBrokerApiPolicyChains(wbb, vhost, channelName) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to build chains for channel %q in WebBrokerApi %q: %w", channelName, wbb.Name, err) | ||
| } | ||
|
|
||
| channelChains[channelName] = ChannelPolicyChains{ | ||
| ConnInitReqKey: connInitReqKey, | ||
| ConnInitRespKey: connInitRespKey, | ||
| ProduceKey: produceKey, | ||
| ConsumeKey: consumeKey, | ||
| } | ||
|
|
||
| // Extract ALL topics (produce + consume) to ensure they exist in Kafka | ||
| allChannelTopics := extractAllTopicsFromChannelPolicies(channelName, channelDef) | ||
| allTopics = append(allTopics, allChannelTopics...) | ||
|
|
||
| // Extract ONLY consume topics for subscription mapping | ||
| consumeTopics := extractTopicsFromChannelPolicies(channelName, channelDef) | ||
| for _, topic := range consumeTopics { | ||
| topicToChannel[topic] = channelName | ||
| } | ||
| } | ||
|
|
||
| // Register binding in hub. | ||
| r.hub.RegisterBinding(hub.ChannelBinding{ | ||
| APIID: wbb.APIID, | ||
| Name: wbb.Name, | ||
| Mode: "protocol-mediation", | ||
| Context: wbb.Context, | ||
| Version: wbb.Version, | ||
| Vhost: vhost, | ||
| SubscribeChainKey: apiConnInitReqKey, | ||
| InboundChainKey: "", // Determined per-channel | ||
| OutboundChainKey: "", // Determined per-channel | ||
| }) | ||
|
|
||
| // Create broker-driver. | ||
| brokerDriverType := wbb.BrokerDriver.Type | ||
| if brokerDriverType == "" { | ||
| brokerDriverType = "kafka" | ||
| } | ||
| brokerDriverConfig := wbb.BrokerDriver.Config | ||
| if brokerDriverConfig == nil { | ||
| brokerDriverConfig = wbb.BrokerDriver.Properties | ||
| } | ||
| brokerDriver, err := r.registry.CreateBrokerDriver(brokerDriverType, brokerDriverConfig) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to create broker-driver for WebBrokerApi %q: %w", wbb.Name, err) | ||
| } | ||
| r.brokerDrivers = append(r.brokerDrivers, brokerDriver) | ||
|
|
||
| hasWS = true | ||
|
|
||
| ch := connectors.ChannelInfo{ | ||
| Name: wbb.Name, | ||
| Mode: "protocol-mediation", | ||
| Context: wbb.Context, | ||
| Version: wbb.Version, | ||
| Vhost: vhost, | ||
| Topics: allTopics, | ||
| Metadata: map[string]interface{}{ | ||
| "channelChains": channelChainsToMap(channelChains), | ||
| "topicToChannel": topicToChannel, | ||
| "channelNames": getChannelNames(wbb.Channels), | ||
| }, | ||
| } | ||
|
|
||
| // Create WebBrokerApi receiver. | ||
| receiverType := wbb.Receiver.Type | ||
| if receiverType == "" { | ||
| receiverType = "websocket" | ||
| } | ||
|
|
||
| ep, err := r.registry.CreateReceiver(receiverType+"-broker-api", connectors.ReceiverConfig{ | ||
| Channel: ch, | ||
| Processor: r.hub, | ||
| BrokerDriver: brokerDriver, | ||
| RuntimeID: r.cfg.RuntimeID, | ||
| Mux: wsMux, | ||
| }) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to create receiver for WebBrokerApi %q: %w", wbb.Name, err) | ||
| } | ||
| r.receivers = append(r.receivers, ep) | ||
|
|
||
| slog.Info("Registered WebBrokerApi binding", | ||
| "name", wbb.Name, | ||
| "context", wbb.Context, | ||
| "version", wbb.Version, | ||
| "receiver", receiverType, | ||
| "topics", allTopics, | ||
| "channels", len(wbb.Channels), | ||
| ) | ||
| } |
There was a problem hiding this comment.
Two consistency concerns in the WebBrokerApi LoadChannels path.
-
Channel-level chains are built (Lines 305-320) and surfaced via
Metadata["channelChains"], but only the API-levelapiConnInitReqKeyis stored on the hubChannelBinding(Lines 334-344).unregisterBindingChains(Lines 929-959) only knows aboutSubscribeChainKey/InboundChainKey/etc.; per-channel keys for WebBrokerApi will leak on removal. Consider populatingChannelChainKeys(or an equivalent) so they participate in cleanup. -
AddWebBrokerApiBindingreads onlywbb.BrokerDriver.Config(Line 1205) while this static path also falls back towbb.BrokerDriver.Properties(Lines 351-354). Recommend aligning both paths to use the same resolution helper to avoid surprises when the YAML usesproperties.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@event-gateway/gateway-runtime/internal/runtime/runtime.go` around lines 294 -
403, Channel-level policy chain keys built by buildWebBrokerApiPolicyChains are
placed only into Metadata["channelChains"] but not recorded on the hub
ChannelBinding, so unregisterBindingChains cannot clean them up; update the
binding registration to populate a ChannelChainKeys (or similar) field on
hub.ChannelBinding with per-channel ConnInitReq/ConnInitResp/Produce/Consume
keys (derive from channelChains or channelChainsToMap) so
unregisterBindingChains can remove them. Also unify BrokerDriver config
resolution: instead of directly using wbb.BrokerDriver.Config with a fallback to
Properties here (where you call r.registry.CreateBrokerDriver), reuse the same
broker-driver config resolution helper used by AddWebBrokerApiBinding (the
existing code path that reads wbb.BrokerDriver.Config) so both static and
dynamic load paths resolve Config/Properties identically before calling
r.registry.CreateBrokerDriver.
There was a problem hiding this comment.
Will check this and consider this as a future improvement
There was a problem hiding this comment.
@senthuran16, understood. Would you like me to open a follow-up GitHub issue to track these two items so they aren't lost?
- Per-channel chain key cleanup — register per-channel
ConnInitReq/ConnInitResp/Produce/Consumekeys onhub.ChannelBinding(or equivalent) sounregisterBindingChainscan clean them up on removal. - BrokerDriver config resolution parity — align the static
LoadChannelspath (Config→Propertiesfallback) with the dynamicAddWebBrokerApiBindingpath so both resolve broker-driver config identically.
🧠 Learnings used
Learnt from: Krishanx92
Repo: wso2/api-platform PR: 1496
File: gateway/gateway-controller/pkg/policyxds/combined_cache.go:559-559
Timestamp: 2026-03-26T09:58:10.495Z
Learning: When reviewing Go code in this repo, usages of `cache.NewTestRawResponse` from `github.com/envoyproxy/go-control-plane/pkg/cache/v3` (Go-Control-Plane v0.14.0+) are valid for production use. Do not flag it as a missing/non-existent function or as “test-only”; the function is publicly exported with signature `func NewTestRawResponse(req *discovery.DiscoveryRequest, version string, resources []types.ResourceWithTTL) *RawResponse`.
| // AddWebBrokerApiBinding dynamically adds a WebBrokerApi binding at runtime. | ||
| func (r *Runtime) AddWebBrokerApiBinding(wbb binding.WebBrokerApiBinding) error { | ||
| r.mu.Lock() | ||
|
|
||
| vhost := defaultVhost(wbb.Vhost) | ||
|
|
||
| // Build API-level policy chains. | ||
| apiConnInitReqKey, _, _, _, err := r.buildWebBrokerApiPolicyChains(wbb, vhost, "") | ||
| if err != nil { | ||
| r.mu.Unlock() | ||
| return fmt.Errorf("failed to build API-level chains for WebBrokerApi %q: %w", wbb.Name, err) | ||
| } | ||
|
|
||
| // Build per-channel policy chains and collect topics. | ||
| channelChains := make(map[string]ChannelPolicyChains) | ||
| allTopics := []string{} // All topics (produce + consume) for ensuring they exist | ||
| topicToChannel := make(map[string]string) // Only consume topics for subscription mapping | ||
| channelTopics := make(map[string]map[string]string) // Channel-level topic mappings (produceTo, consumeFrom) | ||
|
|
||
| for channelName, channelDef := range wbb.Channels { | ||
| connInitReqKey, connInitRespKey, produceKey, consumeKey, err := r.buildWebBrokerApiPolicyChains(wbb, vhost, channelName) | ||
| if err != nil { | ||
| r.mu.Unlock() | ||
| return fmt.Errorf("failed to build chains for channel %q in WebBrokerApi %q: %w", channelName, wbb.Name, err) | ||
| } | ||
|
|
||
| channelChains[channelName] = ChannelPolicyChains{ | ||
| ConnInitReqKey: connInitReqKey, | ||
| ConnInitRespKey: connInitRespKey, | ||
| ProduceKey: produceKey, | ||
| ConsumeKey: consumeKey, | ||
| } | ||
|
|
||
| // Store channel topic mappings (use defaults if not specified) | ||
| topicMapping := make(map[string]string) | ||
| if channelDef.ProduceTo != nil && channelDef.ProduceTo.Topic != "" { | ||
| topicMapping["produceTo"] = channelDef.ProduceTo.Topic | ||
| } else { | ||
| // Default: use normalized channel name for producing | ||
| topicMapping["produceTo"] = binding.NormalizeTopicSegment(channelName) | ||
| } | ||
| if channelDef.ConsumeFrom != nil && channelDef.ConsumeFrom.Topic != "" { | ||
| topicMapping["consumeFrom"] = channelDef.ConsumeFrom.Topic | ||
| } else { | ||
| // Default: use normalized channel name for consuming | ||
| topicMapping["consumeFrom"] = binding.NormalizeTopicSegment(channelName) | ||
| } | ||
| channelTopics[channelName] = topicMapping | ||
|
|
||
| // Extract ALL topics (produce + consume) to ensure they exist in Kafka | ||
| allChannelTopics := extractAllTopicsFromChannelPolicies(channelName, channelDef) | ||
| allTopics = append(allTopics, allChannelTopics...) | ||
|
|
||
| // Extract ONLY consume topics for subscription mapping | ||
| consumeTopics := extractTopicsFromChannelPolicies(channelName, channelDef) | ||
| for _, topic := range consumeTopics { | ||
| topicToChannel[topic] = channelName | ||
| } | ||
|
|
||
| slog.Info("Built policy chains for WebBrokerApi channel", | ||
| "api", wbb.Name, | ||
| "channel", channelName, | ||
| "topics", allChannelTopics) | ||
| } | ||
|
|
||
| r.hub.RegisterBinding(hub.ChannelBinding{ | ||
| APIID: wbb.APIID, | ||
| Name: wbb.Name, | ||
| Mode: "protocol-mediation", | ||
| Context: wbb.Context, | ||
| Version: wbb.Version, | ||
| Vhost: vhost, | ||
| SubscribeChainKey: apiConnInitReqKey, | ||
| InboundChainKey: "", // Determined per-channel | ||
| OutboundChainKey: "", // Determined per-channel | ||
| }) | ||
|
|
||
| // Create broker-driver. | ||
| brokerDriverType := "kafka" | ||
| if wbb.BrokerDriver.Type != "" { | ||
| brokerDriverType = wbb.BrokerDriver.Type | ||
| } | ||
| brokerDriver, err := r.registry.CreateBrokerDriver(brokerDriverType, wbb.BrokerDriver.Config) | ||
| if err != nil { | ||
| r.mu.Unlock() | ||
| return fmt.Errorf("failed to create broker-driver for WebBrokerApi %q: %w", wbb.Name, err) | ||
| } | ||
| r.activeBrokerDrivers[wbb.Name] = brokerDriver | ||
|
|
||
| ch := connectors.ChannelInfo{ | ||
| Name: wbb.Name, | ||
| Mode: "protocol-mediation", | ||
| Context: wbb.Context, | ||
| Version: wbb.Version, | ||
| Vhost: vhost, | ||
| Topics: allTopics, | ||
| Metadata: map[string]interface{}{ | ||
| "channelChains": channelChainsToMap(channelChains), | ||
| "topicToChannel": topicToChannel, | ||
| "channelNames": getChannelNames(wbb.Channels), | ||
| "channelTopics": channelTopics, | ||
| }, | ||
| } | ||
|
|
||
| // Determine receiver type (websocket, sse, etc.) | ||
| receiverType := "websocket-broker-api" | ||
| if wbb.Receiver.Type != "" { | ||
| receiverType = wbb.Receiver.Type + "-broker-api" | ||
| } | ||
|
|
||
| receiver, err := r.registry.CreateReceiver(receiverType, connectors.ReceiverConfig{ | ||
| Channel: ch, | ||
| Processor: r.hub, | ||
| BrokerDriver: brokerDriver, | ||
| RuntimeID: r.cfg.RuntimeID, | ||
| Mux: r.wsMux, | ||
| }) | ||
| if err != nil { | ||
| r.mu.Unlock() | ||
| return fmt.Errorf("failed to create receiver for WebBrokerApi %q: %w", wbb.Name, err) | ||
| } | ||
| r.activeReceivers[wbb.Name] = receiver | ||
|
|
||
| startNow := r.running | ||
| startCtx := r.runCtx | ||
| r.mu.Unlock() | ||
|
|
||
| // If runtime is already running, start the receiver immediately. | ||
| if startNow { | ||
| if startCtx == nil { | ||
| startCtx = context.Background() | ||
| } | ||
| if err := r.startReceiverWithRetry(startCtx, wbb.Name, receiver); err != nil { | ||
| return fmt.Errorf("failed to start receiver for WebBrokerApi %q: %w", wbb.Name, err) | ||
| } | ||
| } | ||
|
|
||
| slog.Info("Dynamically added WebBrokerApi binding", | ||
| "name", wbb.Name, | ||
| "context", wbb.Context, | ||
| "version", wbb.Version, | ||
| "receiver_type", receiverType, | ||
| "channels", len(wbb.Channels), | ||
| "topics", allTopics) | ||
|
|
||
| return nil | ||
| } |
There was a problem hiding this comment.
Missing Kafka topic tracking parity with AddWebSubApiBinding.
AddWebSubApiBinding records all Kafka topics in r.bindingTopics[wsb.Name] (Lines 1013-1019) so RemoveWebSubApiBinding can call bd.DeleteTopics(...) on teardown. AddWebBrokerApiBinding computes allTopics (Line 1138, populated at Line 1174) but never assigns it to r.bindingTopics, and RemoveWebBrokerApiBinding (Lines 1272-1310) does not delete Kafka topics. If parity with WebSubApi cleanup is intended, persist allTopics here and add the corresponding DeleteTopics call on removal; if intentional, a short comment explaining why WebBrokerApi topics are kept across removals would help future maintainers.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@event-gateway/gateway-runtime/internal/runtime/runtime.go` around lines 1123
- 1269, AddWebBrokerApiBinding computes allTopics but never stores them into
r.bindingTopics[wbb.Name], and RemoveWebBrokerApiBinding does not delete those
Kafka topics on teardown; update AddWebBrokerApiBinding (after building
ch/Topics metadata and before releasing the lock) to set
r.bindingTopics[wbb.Name] = allTopics, and update RemoveWebBrokerApiBinding to
call the corresponding broker driver DeleteTopics (or r.registry-created
brokerDriver.DeleteTopics/bd.DeleteTopics with r.bindingTopics[name]) during
removal cleanup (or add a clear comment if topics are intentionally retained) so
WebBrokerApi has the same topic lifecycle as
AddWebSubApiBinding/RemoveWebSubApiBinding.
There was a problem hiding this comment.
Will check this and consider this as a future improvement
There was a problem hiding this comment.
@senthuran16, understood. Would you like me to open a GitHub issue to track the missing r.bindingTopics population and DeleteTopics call in AddWebBrokerApiBinding/RemoveWebBrokerApiBinding for future follow-up?
🧠 Learnings used
Learnt from: Krishanx92
Repo: wso2/api-platform PR: 1496
File: gateway/gateway-controller/pkg/policyxds/combined_cache.go:559-559
Timestamp: 2026-03-26T09:58:10.495Z
Learning: When reviewing Go code in this repo, usages of `cache.NewTestRawResponse` from `github.com/envoyproxy/go-control-plane/pkg/cache/v3` (Go-Control-Plane v0.14.0+) are valid for production use. Do not flag it as a missing/non-existent function or as “test-only”; the function is publicly exported with signature `func NewTestRawResponse(req *discovery.DiscoveryRequest, version string, resources []types.ResourceWithTTL) *RawResponse`.
| case api.WebBrokerApi: | ||
| apiName = c.Spec.DisplayName | ||
| apiVersion = c.Spec.Version | ||
| // TODO: Add validation for WebBrokerApi once validator supports it | ||
| // validationErrors := s.validator.Validate(&c) | ||
| // if len(validationErrors) > 0 { | ||
| // s.logValidationErrors(params.Logger, apiID, apiName, validationErrors) | ||
| // return nil, &ValidationErrorListError{Errors: validationErrors} | ||
| // } |
There was a problem hiding this comment.
Track validation gap for WebBrokerApi.
Validation is bypassed for WebBrokerApi while RestAPI and WebSubAPI are both validated here. Downstream calls (e.g., validateArtifactConflicts, store/render flow) will then run against an unvalidated spec until the validator gains support for this kind. Consider tracking this with an issue so the TODO is not lost, and keep the unsupported-kind branch in validator.Validate defensive in the meantime.
Would you like me to open a tracking issue for adding WebBrokerApi support to config.Validator?
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@gateway/gateway-controller/pkg/utils/api_deployment.go` around lines 311 -
319, This branch skips validation for api.WebBrokerApi (case api.WebBrokerApi)
which leaves downstream calls like validateArtifactConflicts and the
store/render flow operating on unvalidated specs; create a tracking issue for
adding WebBrokerApi support to config.Validator and reference that issue ID in
the TODO comment, and make the unsupported-kind path in s.validator.Validate
defensive (ensure validator.Validate returns a clear validation error or
empty-but-explicit result for unknown kinds so the caller (the switch handling
api.WebBrokerApi) can either log and return a ValidationErrorListError via
s.logValidationErrors/ValidationErrorListError or otherwise abort processing
instead of proceeding silently).
There was a problem hiding this comment.
Will open an issue and track it
There was a problem hiding this comment.
@senthuran16, sounds good. If you'd like me to open the tracking issue on your behalf, just let me know and I'll do that right away. Otherwise, I'd suggest referencing the issue number in the TODO comment once created, e.g.:
case api.WebBrokerApi:
apiName = c.Spec.DisplayName
apiVersion = c.Spec.Version
// TODO(#<issue-number>): Add validation for WebBrokerApi once validator supports it🧠 Learnings used
Learnt from: Saadha123
Repo: wso2/api-platform PR: 1420
File: gateway/gateway-controller/pkg/config/policy_validator.go:149-163
Timestamp: 2026-03-20T15:22:10.323Z
Learning: In the API Gateway controller config/transform/builder code under `gateway/gateway-controller/pkg/**`, treat `ResolvePolicyVersion` in `policy_validator.go` as a validation + definition-existence helper only. At its call sites, do not pass the resolved full semver (e.g., `v1.0.0`) downstream—retain and pass the original major-only policy reference string (e.g., `v1`) to `ConvertAPIPolicyToModel`/`convertAPIPolicyToSDK`, since the policy engine expects major-only format. For empty-version handling, note that the `ResolvePolicyVersion` path is not exercised by LLM policies; those policies resolve versions through the transformer’s `policyVersionResolver` instead.
Learnt from: VirajSalaka
Repo: wso2/api-platform PR: 1425
File: gateway/gateway-controller/pkg/api/handlers/handlers.go:730-732
Timestamp: 2026-03-22T04:48:40.788Z
Learning: In the wso2/api-platform `gateway/gateway-controller` service, treat `EventHub` as DB-backed: when `s.eventHub != nil`, `s.db` is guaranteed to be non-nil. Therefore, for handlers and related code, it’s sufficient to guard on `s.eventHub == nil` (e.g., skip EventHub-dependent paths) and you should not also add redundant `s.db == nil` checks. This is based on the project invariant that memory-only (non-DB) mode is planned for removal.
Learnt from: nimsara66
Repo: wso2/api-platform PR: 1438
File: gateway/gateway-controller/pkg/utils/api_deployment.go:353-359
Timestamp: 2026-03-23T02:48:39.635Z
Learning: For secret handling during API deployment in the gateway-controller codebase, follow the intended design “resolve at runtime, persist unresolved.” When calling `saveOrUpdateConfig`, always persist the stored (unresolved) config (`storedCfg`) that still contains `$secret{...}` placeholders. Use the resolved/decrypted config (`resolvedCfg`) only for runtime purposes (e.g., values returned in `APIDeploymentResult.StoredConfig` for xDS/policy evaluation). Never persist or write `resolvedCfg` (plaintext secrets) to the DB or in-memory store—reviews should flag any changes that write decrypted secret values to storage or logs.
Learnt from: dushaniw
Repo: wso2/api-platform PR: 1463
File: gateway/gateway-controller/pkg/controlplane/sync.go:323-362
Timestamp: 2026-03-25T04:34:50.763Z
Learning: In the gateway-controller codebase, it is an accepted design to log and continue after failures in in-memory state updates (e.g., store.Update) or event-hub publishes (e.g., eventHub.PublishEvent) when the failure occurs following a successful DB upsert (e.g., UpsertConfig). Do not review-comment this as a bug or require synchronous rollback/error propagation. This applies to sync logic (e.g., processSyncStatusUpdates/processSyncDeletions), websocket event handlers (e.g., handleAPIUndeployedEvent and similar), and the REST service layer (e.g., saveOrUpdateConfig). The DB upsert rollback is intentionally not enforced because prior config state is not retained before overwriting, and gateway restart is relied on to recover from any resulting in-memory/xDS staleness via store reload from the DB.
Learnt from: VirajSalaka
Repo: wso2/api-platform PR: 1493
File: gateway/gateway-controller/tests/integration/schema_test.go:201-202
Timestamp: 2026-03-26T06:54:34.762Z
Learning: For the wso2/api-platform gateway/gateway-controller service, treat “delete and recreate” of the database (during tests/upgrades) as the intentional/accepted upgrade path for SQL schema changes. Do not raise review issues for the absence of schema migrations or SQL schema-version upgrade logic in this service’s database implementation files (e.g., sqlite.go/postgres.go), since this is a deliberate design choice.
Learnt from: Krishanx92
Repo: wso2/api-platform PR: 1496
File: gateway/gateway-controller/pkg/policyxds/combined_cache.go:559-559
Timestamp: 2026-03-26T09:58:10.495Z
Learning: When reviewing Go code in this repo, usages of `cache.NewTestRawResponse` from `github.com/envoyproxy/go-control-plane/pkg/cache/v3` (Go-Control-Plane v0.14.0+) are valid for production use. Do not flag it as a missing/non-existent function or as “test-only”; the function is publicly exported with signature `func NewTestRawResponse(req *discovery.DiscoveryRequest, version string, resources []types.ResourceWithTTL) *RawResponse`.
Learnt from: Krishanx92
Repo: wso2/api-platform PR: 1500
File: gateway/gateway-controller/pkg/utils/mcp_transformer.go:150-153
Timestamp: 2026-03-27T17:26:39.573Z
Learning: In wso2/api-platform’s `gateway/gateway-controller` service, the OpenAPI spec `gateway/gateway-controller/api/management-openapi.yaml` defines the `Policy` schema’s `version` as required (under `required: [name, version]`) with pattern `'^v\d+$'`, and the generated Go model `gateway/gateway-controller/pkg/api/management/generated.go` represents `Policy.Version` as a non-pointer `string`.
During code review, ensure any PR that constructs an `api.Policy` does not omit/leave `Version` unset; leaving it empty will fail the API validation. If the PR’s change requires `Version` to become optional (or otherwise changes its required/validation semantics), you must update `management-openapi.yaml` accordingly (e.g., remove `version` from the `required` list), then regenerate the Go types so validation behavior matches the spec.
There was a problem hiding this comment.
Actionable comments posted: 7
🧹 Nitpick comments (2)
event-gateway/gateway-runtime/internal/connectors/receiver/websocket/broker_api_connector.go (2)
124-138: ⚡ Quick win
Stopignores itsctx, and per-connection consumer shutdown usescontext.Background().
Stop(ctx)accepts a context but never propagates it tocloseConnection, andcloseConnectioninvokesconn.kafkaConsumer.Stop(context.Background()). If a caller passes a deadline-bound context toStopto bound shutdown time, that deadline is silently dropped, and a slow consumer shutdown can stall the gateway. ThreadingctxthroughcloseConnection(or at minimum to the consumerStopcall during shutdown) would preserve the caller's timeout semantics.Also applies to: 508-514
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@event-gateway/gateway-runtime/internal/connectors/receiver/websocket/broker_api_connector.go` around lines 124 - 138, The Stop method on WebBrokerApiReceiver currently ignores its ctx and closeConnection calls conn.kafkaConsumer.Stop with context.Background(); change Stop to pass its ctx into closeConnection (e.g., closeConnection(ctx, conn)) and update closeConnection to accept a context parameter and use that ctx when calling conn.kafkaConsumer.Stop(ctx) so the caller's deadline/cancellation is honored; make the same ctx-threading change for the other shutdown call sites referenced (the other closeConnection invocations around the same connector).
209-232: 💤 Low valueGuard against a nil
processedbefore dereferencing.If
ProcessConnectionInitRequestever returns(nil, false, nil)(no short-circuit, no error, but no result), lines 227–231 will dereferenceprocessed.Headersand panic. Even if the current implementation always returns a non-nil result on the happy path, a defensive nil-check here makes the handler resilient to future contract changes inMessageProcessor. The same consideration applies to the short-circuit branch on lines 213–222.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@event-gateway/gateway-runtime/internal/connectors/receiver/websocket/broker_api_connector.go` around lines 209 - 232, Guard against a nil processed result from ProcessConnectionInitRequest: before accessing processed.Headers or processed.Value in both the shortCircuited branch and the header-update branch, check if processed == nil and handle it (e.g., log an error via slog or processLogger and return a 500/appropriate response or skip header updates) to avoid panics; update the shortCircuited block that reads processed.Metadata/Headers/Value and the subsequent loop that updates r.Header to first nil-check processed and perform a safe fallback path.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@event-gateway/ARCHITECTURE.md`:
- Around line 14-17: Add missing language identifiers to the fenced code blocks
in ARCHITECTURE.md: locate the blocks that contain the API examples (e.g., the
block with "POST /websub-apis → CreateWebSubAPI()" and "POST /webbroker-apis →
CreateWebBrokerApi()") and the other similar example blocks called out in the
review, and change their opening fences from ``` to ```text (or another
appropriate language tag) so markdownlint MD040 is satisfied and editors render
correctly.
- Line 332: Update the broken internal link fragment that currently points to
"#11-extensibility--plugin-architecture" so it matches the actual heading
number; replace the fragment with "#12-extensibility--plugin-architecture" (or
remove the numeric prefix entirely to use "
`#extensibility--plugin-architecture`") wherever the link text referencing the
Extensibility/Plugin Architecture appears to ensure the anchor resolves to the
"Extensibility -- Plugin Architecture" section.
In
`@event-gateway/gateway-runtime/internal/connectors/receiver/websocket/broker_api_connector.go`:
- Around line 291-339: channelTopics can be empty and you currently proceed to
call e.brokerDriver.Subscribe which may fail or no-op; change the early-warning
path where len(channelTopics) == 0 to treat it as a fatal handshake error: log
an error (including channelName and connID if already created or create connID
earlier), close the WebSocket, call cancel(), and return before calling
e.brokerDriver.Subscribe so the per-connection consumer
(Subscribe/consumer.Start) is never created for an unconsumable channel; update
the handling around channelTopics, Subscribe, and brokerApiConnection creation
so the function returns early on empty topics.
- Around line 85-98: The handler registered by NewBrokerApiReceiver
(handleUpgrade) can run before Start assigns e.ctx, causing a panic when
handleUpgrade calls context.WithCancel(nil); fix by initializing the receiver's
context field (e.ctx) to context.Background() in the constructor
(NewBrokerApiReceiver) so handleUpgrade always has a non-nil context, and then
in Start replace e.ctx with the provided ctx; alternatively implement a
short-circuit in handleUpgrade to return a clear error if e.ctx is not yet
initialized—but preferred is setting e.ctx = context.Background() in the
constructor and overwriting it in Start.
- Around line 498-525: closeConnection currently closes conn.inbound and
conn.outbound while writers (readLoop and the Kafka handler) may still be
blocked or in-flight, leading to send-on-closed-channel panics; modify
closeConnection and the connection lifecycle to wait for all sender goroutines
to exit before closing channels: introduce a sync.WaitGroup or sender counter on
brokerApiConnection (used by readLoop and the Kafka handler to
increment/decrement), ensure readLoop and the Kafka outbound sender decrement
the wg when they finish (or return) and that closeConnection calls
conn.cancel(), waits for that wg to complete (or another per-connection done
signal) and only then closes conn.inbound and conn.outbound and removes the
connection; keep references to conn.cancel, kafkaConsumer.Stop, readLoop and the
Kafka handler names to find the spots to add wg.Add/Done and the wait.
In `@event-gateway/WEBBROKERAPI.md`:
- Around line 1022-1025: Update the fenced code blocks in WEBBROKERAPI.md to
include a language identifier (e.g., use ```text) so markdownlint MD040 is
satisfied; locate the plain fenced blocks that contain outputs such as
"Connected (press CTRL+C to quit)" and the other listed snippets (lines around
the occurrences you noted) and prepend the fence with an appropriate
language/token (text) for each block.
- Around line 1377-1389: The channel examples (channel name "/default" with
on_produce and on_consume entries using the map-topic policy) are placed at the
top level of the channel but must be nested under the policies key to match the
documented schema; update the entries so on_produce and on_consume live under
policies (e.g., policies: { on_produce: [...], on_consume: [...] }) for the
map-topic v1 examples and apply the same change to the other occurrences
referenced (the blocks around lines 1419-1431 and 1464-1489).
---
Nitpick comments:
In
`@event-gateway/gateway-runtime/internal/connectors/receiver/websocket/broker_api_connector.go`:
- Around line 124-138: The Stop method on WebBrokerApiReceiver currently ignores
its ctx and closeConnection calls conn.kafkaConsumer.Stop with
context.Background(); change Stop to pass its ctx into closeConnection (e.g.,
closeConnection(ctx, conn)) and update closeConnection to accept a context
parameter and use that ctx when calling conn.kafkaConsumer.Stop(ctx) so the
caller's deadline/cancellation is honored; make the same ctx-threading change
for the other shutdown call sites referenced (the other closeConnection
invocations around the same connector).
- Around line 209-232: Guard against a nil processed result from
ProcessConnectionInitRequest: before accessing processed.Headers or
processed.Value in both the shortCircuited branch and the header-update branch,
check if processed == nil and handle it (e.g., log an error via slog or
processLogger and return a 500/appropriate response or skip header updates) to
avoid panics; update the shortCircuited block that reads
processed.Metadata/Headers/Value and the subsequent loop that updates r.Header
to first nil-check processed and perform a safe fallback path.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: ec30cb9d-acf2-4c57-aaf8-31dffdfab0bd
📒 Files selected for processing (4)
event-gateway/ARCHITECTURE.mdevent-gateway/WEBBROKERAPI.mdevent-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/consumer.goevent-gateway/gateway-runtime/internal/connectors/receiver/websocket/broker_api_connector.go
🚧 Files skipped from review as they are similar to previous changes (1)
- event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/consumer.go
| cfg.Mux.HandleFunc(cfg.Channel.Context, e.handleUpgrade) | ||
|
|
||
| slog.Info("WebBrokerApi receiver registered HTTP handler", | ||
| "channel", cfg.Channel.Name, | ||
| "path", cfg.Channel.Context, | ||
| "mode", cfg.Channel.Mode, | ||
| "port", opts.Port) | ||
|
|
||
| return e, nil | ||
| } | ||
|
|
||
| // Start initializes the receiver. | ||
| func (e *WebBrokerApiReceiver) Start(ctx context.Context) error { | ||
| e.ctx = ctx |
There was a problem hiding this comment.
Race between handler registration and Start: e.ctx may be nil when an upgrade arrives.
handleUpgrade is registered on the mux inside NewBrokerApiReceiver (line 85), but e.ctx is only assigned in Start (line 98). If a WebSocket request reaches the mux before Start runs, line 297 will call context.WithCancel(nil), which panics. Consider initializing e.ctx to context.Background() in the constructor and replacing it in Start, or rejecting requests with a clear error until Start has been called.
🛡️ Suggested guard in the constructor
e := &WebBrokerApiReceiver{
channel: cfg.Channel,
processor: cfg.Processor,
brokerDriver: cfg.BrokerDriver,
opts: opts,
connections: make(map[string]*brokerApiConnection),
+ ctx: context.Background(),
}🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@event-gateway/gateway-runtime/internal/connectors/receiver/websocket/broker_api_connector.go`
around lines 85 - 98, The handler registered by NewBrokerApiReceiver
(handleUpgrade) can run before Start assigns e.ctx, causing a panic when
handleUpgrade calls context.WithCancel(nil); fix by initializing the receiver's
context field (e.ctx) to context.Background() in the constructor
(NewBrokerApiReceiver) so handleUpgrade always has a non-nil context, and then
in Start replace e.ctx with the provided ctx; alternatively implement a
short-circuit in handleUpgrade to return a clear error if e.ctx is not yet
initialized—but preferred is setting e.ctx = context.Background() in the
constructor and overwriting it in Start.
There was a problem hiding this comment.
Will check and consider this as a future improvement
There was a problem hiding this comment.
@senthuran16, understood. Would you like me to open a GitHub issue to track this so it doesn't get lost? It would capture the race condition between handler registration in NewBrokerApiReceiver and context assignment in Start, with a reference back to this PR.
🧠 Learnings used
Learnt from: Krishanx92
Repo: wso2/api-platform PR: 1496
File: gateway/gateway-controller/pkg/policyxds/combined_cache.go:559-559
Timestamp: 2026-03-26T09:58:10.495Z
Learning: When reviewing Go code in this repo, usages of `cache.NewTestRawResponse` from `github.com/envoyproxy/go-control-plane/pkg/cache/v3` (Go-Control-Plane v0.14.0+) are valid for production use. Do not flag it as a missing/non-existent function or as “test-only”; the function is publicly exported with signature `func NewTestRawResponse(req *discovery.DiscoveryRequest, version string, resources []types.ResourceWithTTL) *RawResponse`.
| // closeConnection closes a connection and cleans up resources. | ||
| func (e *WebBrokerApiReceiver) closeConnection(conn *brokerApiConnection) { | ||
| conn.mu.Lock() | ||
| if conn.closed { | ||
| conn.mu.Unlock() | ||
| return | ||
| } | ||
| conn.closed = true | ||
| conn.mu.Unlock() | ||
|
|
||
| conn.cancel() | ||
|
|
||
| if conn.kafkaConsumer != nil { | ||
| if err := conn.kafkaConsumer.Stop(context.Background()); err != nil { | ||
| slog.Error("Failed to stop per-connection consumer", "connID", conn.connID, "error", err) | ||
| } | ||
| } | ||
|
|
||
| close(conn.inbound) | ||
| close(conn.outbound) | ||
| conn.ws.Close() | ||
|
|
||
| e.mu.Lock() | ||
| delete(e.connections, conn.connID) | ||
| e.mu.Unlock() | ||
|
|
||
| slog.Info("WebSocket connection closed", "connID", conn.connID, "channel", e.channel.Name) | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify whether the Kafka consumer's Stop() is documented to drain in-flight callbacks
# before returning. The receiver's contract determines whether close(outbound) is safe.
fd -t f 'consumer.go' event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka -x cat {}
ast-grep --pattern $'func ($_ *$_) Stop($_ $_) error {
$$$
}'
rg -nP --type=go -C3 '\bSubscribe\s*\(' event-gateway/gateway-runtime/internal/connectorsRepository: wso2/api-platform
Length of output: 18784
🏁 Script executed:
cat -n event-gateway/gateway-runtime/internal/connectors/receiver/websocket/broker_api_connector.go | head -450 | tail -100Repository: wso2/api-platform
Length of output: 3874
🏁 Script executed:
cat -n event-gateway/gateway-runtime/internal/connectors/receiver/websocket/broker_api_connector.go | sed -n '350,430p'Repository: wso2/api-platform
Length of output: 3040
🏁 Script executed:
cat -n event-gateway/gateway-runtime/internal/connectors/receiver/websocket/broker_api_connector.go | sed -n '480,540p'Repository: wso2/api-platform
Length of output: 1613
🏁 Script executed:
cat -n event-gateway/gateway-runtime/internal/connectors/receiver/websocket/broker_api_connector.go | sed -n '310,330p'Repository: wso2/api-platform
Length of output: 841
Race condition: channels closed while senders are active.
closeConnection closes conn.inbound and conn.outbound (lines 516–517) with unguarded senders still potentially in flight:
-
readLoopis the sole writer toinbound. The context check at lines 360–362 is non-blocking;ReadMessage()at line 365 does not respond to context cancellation. Aftercancel()is called at line 508,readLoopmay remain blocked inReadMessage(). When it finally unblocks and attempts to send at line 391, the channel may already be closed, causing a panic. -
The Kafka handler (lines 314–323) is the sole writer to
outbound. Althoughconn.cancel()is called first at line 508 andkafkaConsumer.Stop()blocks until the consumer loop exits viawg.Wait(), the handler's context is the consumer loop's context—not the connection's context. A handler in-flight when the connection context is cancelled may still execute and attempt to send to an already-closedoutboundchannel, causing a panic.
Consider a synchronization pattern: wait for all senders to exit before closing channels. For example, track active sends with a counter or WaitGroup and verify all goroutines have exited before proceeding to close().
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@event-gateway/gateway-runtime/internal/connectors/receiver/websocket/broker_api_connector.go`
around lines 498 - 525, closeConnection currently closes conn.inbound and
conn.outbound while writers (readLoop and the Kafka handler) may still be
blocked or in-flight, leading to send-on-closed-channel panics; modify
closeConnection and the connection lifecycle to wait for all sender goroutines
to exit before closing channels: introduce a sync.WaitGroup or sender counter on
brokerApiConnection (used by readLoop and the Kafka handler to
increment/decrement), ensure readLoop and the Kafka outbound sender decrement
the wg when they finish (or return) and that closeConnection calls
conn.cancel(), waits for that wg to complete (or another per-connection done
signal) and only then closes conn.inbound and conn.outbound and removes the
connection; keep references to conn.cancel, kafkaConsumer.Stop, readLoop and the
Kafka handler names to find the spots to add wg.Add/Done and the wait.
There was a problem hiding this comment.
Will check on this and consider as a future improvement
There was a problem hiding this comment.
@senthuran16, understood. To ensure this is tracked, would you like me to open a GitHub issue to capture the race condition in closeConnection (unguarded channel closures while readLoop and the Kafka outbound handler may still be active)? That way it won't be lost before a follow-up PR.
🧠 Learnings used
Learnt from: Krishanx92
Repo: wso2/api-platform PR: 1496
File: gateway/gateway-controller/pkg/policyxds/combined_cache.go:559-559
Timestamp: 2026-03-26T09:58:10.495Z
Learning: When reviewing Go code in this repo, usages of `cache.NewTestRawResponse` from `github.com/envoyproxy/go-control-plane/pkg/cache/v3` (Go-Control-Plane v0.14.0+) are valid for production use. Do not flag it as a missing/non-existent function or as “test-only”; the function is publicly exported with signature `func NewTestRawResponse(req *discovery.DiscoveryRequest, version string, resources []types.ResourceWithTTL) *RawResponse`.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@event-gateway/ARCHITECTURE.md`:
- Around line 450-455: The table of WebSub phases in ARCHITECTURE.md is missing
the Unsubscribe phase; update the Phase/Purpose/When Applied rows to either add
a fourth row named "Unsubscribe" (Purpose: Authentication/authorization and
cleanup; When Applied: During unsubscription request to POST /hub or when
subscriber revokes) or explicitly mark unsubscription as out-of-scope with a
note; ensure you reference the existing table context and the "Subscribe",
"Inbound", and "Outbound" phase names so the new row/note stays consistent with
the rest of the document.
In `@event-gateway/README.md`:
- Around line 248-287: The example request body uses the old "broker" and
"allChannels" keys which don't match the WebBrokerApi schema; update the sample
to use "broker-driver" instead of "broker", add a top-level "policies" array
(instead of "allChannels"), and move per-channel policy definitions under each
channel's "policies" key (e.g., in the "prices" channel include "policies": []
and remove "on_connection_init"/"on_produce"/"on_consume" at the top level),
ensuring the "broker-driver" object contains "name", "type", and "properties" as
shown so the payload matches the architecture/reference docs schema.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: fa51d0c5-7f3d-4d06-b886-e8304f43fe6c
📒 Files selected for processing (4)
event-gateway/ARCHITECTURE.mdevent-gateway/README.mdevent-gateway/gateway-runtime/internal/connectors/receiver/websocket/broker_api_connector.goevent-gateway/gateway-runtime/internal/xdsclient/handler.go
🚧 Files skipped from review as they are similar to previous changes (2)
- event-gateway/gateway-runtime/internal/connectors/receiver/websocket/broker_api_connector.go
- event-gateway/gateway-runtime/internal/xdsclient/handler.go
# Conflicts: # gateway/gateway-controller/pkg/api/management/generated.go
Purpose
WebBrokerApi Walkthrough
The WebBrokerApi enables bidirectional WebSocket ↔ Kafka protocol mediation. This walkthrough demonstrates creating a stock trading API where clients can produce messages to Kafka and consume messages in real-time over WebSocket.
Step 1: Create a WebBroker API
Use the following curl command to create a WebBrokerApi with a
priceschannel that maps to Kafka topics:This creates a WebBrokerApi where:
stock.pricesKafka topicdummy.pricesKafka topic are delivered to the WebSocket clientStep 2: Connect via WebSocket
Install
wscatif you haven't already:Connect to the WebBroker API and select the
priceschannel using theX-channelheader:wscat -c ws://localhost:8081/stock-trading/v1.0 -H "X-channel: prices"Once connected, you'll see:
Step 3: Monitor Messages Published to Kafka
In a new terminal, start a Kafka consumer to monitor messages that clients send via WebSocket:
docker exec -it event-gateway-kafka-1 /opt/kafka/bin/kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --topic stock.prices \ --from-beginningNow, type a message in your WebSocket terminal (Step 2) and press Enter:
The message should appear in the Kafka consumer terminal immediately.
Step 4: Publish Messages from Kafka to WebSocket
In another terminal, start a Kafka producer to send messages that will be delivered to WebSocket clients:
docker exec -it event-gateway-kafka-1 /opt/kafka/bin/kafka-console-producer.sh \ --bootstrap-server localhost:9092 \ --topic dummy.pricesType a message in the Kafka producer terminal and press Enter:
The message should appear in your WebSocket terminal (Step 2):
Goals
Approach
User stories
Documentation
Automation tests
Security checks
Samples
Related PRs
Test environment