Skip to content

[DO NOT MERGE] Add Implementation for Exposing Brokers as Subscribable APIs over Web-friendly Protocols#1944

Closed
senthuran16 wants to merge 23 commits into
wso2:mainfrom
senthuran16:protocol-mediation
Closed

[DO NOT MERGE] Add Implementation for Exposing Brokers as Subscribable APIs over Web-friendly Protocols#1944
senthuran16 wants to merge 23 commits into
wso2:mainfrom
senthuran16:protocol-mediation

Conversation

@senthuran16
Copy link
Copy Markdown
Member

@senthuran16 senthuran16 commented May 12, 2026

Purpose

$Subject, resolves #1867


WebBrokerApi Walkthrough

The WebBrokerApi enables bidirectional WebSocket ↔ Kafka protocol mediation. This walkthrough demonstrates creating a stock trading API where clients can produce messages to Kafka and consume messages in real-time over WebSocket.

Step 1: Create a WebBroker API

Use the following curl command to create a WebBrokerApi with a prices channel that maps to Kafka topics:

curl --location 'http://localhost:9090/api/management/v0.9/webbroker-apis' \
--header 'Content-Type: application/json' \
--header 'Accept: application/json' \
--header 'Authorization: Basic YWRtaW46YWRtaW4=' \
--data '{
  "apiVersion": "gateway.api-platform.wso2.com/v1alpha1",
  "kind": "WebBrokerApi",
  "metadata": {
    "name": "stock-trading-v1.0"
  },
  "spec": {
    "displayName": "Stock Trading WebBroker API",
    "version": "v1.0",
    "context": "/stock-trading/v1.0",
    "receiver": {
      "name": "websocket-receiver",
      "type": "websocket"
    },
    "broker": {
      "name": "kafka-driver",
      "type": "kafka",
      "properties": {
        "brokers": [
          "kafka:29092"
        ]
      }
    },
    "allChannels": {
      "on_connection_init": {
        "policies": []
      },
      "on_produce": {
        "policies": []
      },
      "on_consume": {
        "policies": []
      }
    },
    "channels": {
      "prices": {
        "produceTo": {
          "topic": "stock.prices"
        },
        "consumeFrom": {
          "topic": "dummy.prices"
        },
        "on_connection_init": {
          "policies": []
        },
        "on_produce": {
          "policies": []
        },
        "on_consume": {
          "policies": []
        }
      }
    }
  }
}'

This creates a WebBrokerApi where:

  • Client messages are published to the stock.prices Kafka topic
  • Messages from the dummy.prices Kafka topic are delivered to the WebSocket client

Step 2: Connect via WebSocket

Install wscat if you haven't already:

npm install -g wscat

Connect to the WebBroker API and select the prices channel using the X-channel header:

wscat -c ws://localhost:8081/stock-trading/v1.0 -H "X-channel: prices"

Once connected, you'll see:

Connected (press CTRL+C to quit)
> 

Step 3: Monitor Messages Published to Kafka

In a new terminal, start a Kafka consumer to monitor messages that clients send via WebSocket:

docker exec -it event-gateway-kafka-1 /opt/kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic stock.prices \
  --from-beginning

Now, type a message in your WebSocket terminal (Step 2) and press Enter:

> {"symbol": "AAPL", "price": 150.25, "timestamp": "2026-05-13T10:30:00Z"}

The message should appear in the Kafka consumer terminal immediately.

Step 4: Publish Messages from Kafka to WebSocket

In another terminal, start a Kafka producer to send messages that will be delivered to WebSocket clients:

docker exec -it event-gateway-kafka-1 /opt/kafka/bin/kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic dummy.prices

Type a message in the Kafka producer terminal and press Enter:

> {"symbol": "GOOGL", "price": 2750.50, "timestamp": "2026-05-13T10:31:00Z"}

The message should appear in your WebSocket terminal (Step 2):

< {"symbol": "GOOGL", "price": 2750.50, "timestamp": "2026-05-13T10:31:00Z"}

Goals

Describe what solutions this feature or fix introduces to address the problems outlined above.

Approach

Describe how you are implementing the solutions. Include an animated GIF or screenshot if the change affects the UI. Include a link to a Markdown file or Google doc if the feature write-up is too long to paste here.

User stories

Summary of user stories addressed by this change>

Documentation

Link(s) to product documentation that addresses the changes of this PR. If no doc impact, enter “N/A” plus brief explanation of why there’s no doc impact

Automation tests

  • Unit tests

    Code coverage information

  • Integration tests

    Details about the test cases and coverage

Security checks

Samples

Provide high-level details about the samples related to this feature

Related PRs

List any other related PRs

Test environment

List all JDK versions, operating systems, databases, and browser/versions on which this feature/fix was tested

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 12, 2026

Review Change Stack

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

This pull request implements WebBroker API protocol mediation, enabling bidirectional WebSocket-to-Kafka message transformation via declarative APIs and policies. Changes span the controller management plane (REST endpoints, persistence, xDS translation) and event runtime (WebSocket receiver, policy enforcement, dynamic binding). The feature introduces new binding types, connector interfaces for protocol-agnostic policies, and xDS support for dynamic configuration. Documentation includes architecture specifications, REST API contracts, implementation guides, and operational walkthroughs.

Sequence Diagram

sequenceDiagram
  participant User as API User
  participant Ctrl as Gateway Controller
  participant DB as SQLite Config
  participant xDS as xDS Stream
  participant Runtime as Event Runtime
  participant Hub as Message Hub
  participant WSClient as WebSocket Client
  participant Kafka as Kafka

  User->>Ctrl: POST /webbroker-apis (WebBrokerApi spec)
  Ctrl->>DB: store WebBrokerApi config
  Ctrl->>Ctrl: TranslateWebBrokerApisToEventChannelConfigs
  Ctrl->>xDS: push EventChannelConfig resource
  
  xDS->>Runtime: EventChannelResource (channels, policies, receiver, broker-driver)
  Runtime->>Runtime: build policy chains (ConnInit, Produce, Consume)
  Runtime->>Hub: register WebBrokerApi binding
  Runtime->>Runtime: create Kafka broker-driver
  Runtime->>Runtime: instantiate WebBrokerApi receiver (WebSocket)
  
  WSClient->>Runtime: WebSocket upgrade + X-channel header
  Runtime->>Hub: ProcessConnectionInitRequest
  Hub->>Hub: execute on_connection_init.request policies
  Hub-->>Runtime: validated, topic/chain config
  
  alt Policy Short-circuits
    Runtime-->>WSClient: 403 Forbidden
  else Policies Accept
    Runtime->>Kafka: subscribe to consume topics
    Runtime->>Hub: ProcessConnectionInitResponse
    Hub-->>Runtime: response headers applied
    
    loop Bidirectional Flow
      WSClient->>Runtime: produce message
      Runtime->>Hub: ProcessProduce(chainKey)
      Hub-->>Runtime: transformed message + target topic
      Runtime->>Kafka: publish(produceTopic, message)
      
      Kafka->>Runtime: message from consumeTopics
      Runtime->>Hub: ProcessConsume(chainKey)
      Hub-->>Runtime: transformed message
      Runtime->>WSClient: WebSocket binary frame
    end
  end
Loading
🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 inconclusive)

Check name Status Explanation Resolution
Description check ❓ Inconclusive The PR description provides a concrete walkthrough demonstrating WebBrokerApi functionality with curl/wscat/Kafka examples. However, most required template sections (Goals, Approach, User stories, Documentation, Automation tests, Security checks, Samples, Related PRs, Test environment) are present only as headers with no substantive content. Complete the template sections with specific details: Goals/Approach explaining the implementation strategy, user stories, documentation links, test coverage metrics, security validation confirmation, sample descriptions, and test environment specifications.
✅ Passed checks (4 passed)
Check name Status Explanation
Linked Issues check ✅ Passed The PR implements protocol mediation capabilities enabling bidirectional WebSocket↔Kafka mediation via WebBrokerApi, with complete REST management endpoints, runtime binding support, and policy chains, directly satisfying issue #1867's objective.
Out of Scope Changes check ✅ Passed All changes are focused on WebBrokerApi implementation and related protocol-mediation infrastructure. Updates to WebSubApi (schema/endpoints/API key management) are in-scope extensions of the shared protocol-mediation framework. No unrelated refactoring or side changes detected.
Docstring Coverage ✅ Passed Docstring coverage is 85.71% which is sufficient. The required threshold is 80.00%.
Title check ✅ Passed The title accurately describes the main implementation objective: exposing brokers as subscribable APIs over web-friendly protocols (WebBrokerApi). It reflects the core feature addition throughout the changeset.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions
Copy link
Copy Markdown
Contributor

Dependency Validation Results

Dependency name: golang.org/x/crypto
Version: v0.48.0 (was v0.47.0)
Allowed range: >=v0.31.0
Approved: ✅ Yes

Dependency name: golang.org/x/mod
Version: v0.33.0 (was v0.32.0)
Approved: ❌ No - Module not found in dependency registry

Dependency name: github.com/gorilla/websocket
Version: v1.5.4-0.20250319132907-e064f32e3674 (was v1.5.3)
Approved: ❌ No - Module not found in dependency registry

Dependency name: github.com/wso2/api-platform/sdk/core
Version: v0.2.9
Approved: ❌ No - Module not found in dependency registry

Dependency name: golang.org/x/mod
Version: v0.33.0 (was v0.32.0)
Approved: ❌ No - Module not found in dependency registry

Dependency name: github.com/gorilla/websocket
Version: v1.5.4-0.20250319132907-e064f32e3674 (was v1.5.3)
Approved: ❌ No - Module not found in dependency registry

Dependency name: github.com/knadh/koanf/providers/file
Version: v1.2.1 (was v1.2.0)
Allowed range: >=v1.2.1
Approved: ✅ Yes

Dependency name: github.com/gorilla/websocket
Version: v1.5.4-0.20250319132907-e064f32e3674 (was v1.5.3)
Approved: ❌ No - Module not found in dependency registry

Dependency name: github.com/gorilla/websocket
Version: v1.5.4-0.20250319132907-e064f32e3674 (was v1.5.3)
Approved: ❌ No - Module not found in dependency registry

Dependency name: github.com/mattn/go-sqlite3
Version: v1.14.41 (was v1.14.34)
Allowed range: >=v1.14.32
Approved: ✅ Yes


Next Steps

  1. Review the validation failures listed above
  2. Check if dependencies are in the approved dependency list
  3. Options to resolve:
    • Remove the unapproved dependencies from this PR
    • OR submit a PR to add these dependencies to the approved list in engineering-governance
  4. Once resolved, push changes to re-run validation

This PR is blocked until all dependencies are approved.

⚠️ Please verify the scope of the dependencies usage is necessary

@senthuran16 senthuran16 marked this pull request as ready for review May 13, 2026 12:49
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 10

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
event-gateway/gateway-runtime/internal/runtime/runtime.go (1)

445-490: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Risk of binding twice to the same ports when static + xDS are both configured.

When ControlPlane.Enabled is true, Run unconditionally creates and appends a WebSocket server (and WebSub HTTP/HTTPS servers) bound to the configured ports. If LoadChannels already added matching servers at Lines 406-428 because static bindings were present, both will call ListenAndServe on the same port and one will fail with address already in use.

Consider gating these creations on whether servers for that port/role already exist in r.servers, or making xDS mode mutually exclusive with static channel binding loading.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@event-gateway/gateway-runtime/internal/runtime/runtime.go` around lines 445 -
490, The current Run block unconditionally creates WebSocket/WebSub servers when
r.cfg.ControlPlane.Enabled is true, which can duplicate servers already added by
LoadChannels; modify Run to first check r.servers for an existing managed server
matching the intended role/port before calling newManagedServer and
appending/running it. Specifically, in the ControlPlaneEnabled branch around the
WebSocket/WebSub creation, use a lookup (by port and role/name) against
r.servers (the slice populated by LoadChannels) and only call
newManagedServer("WebSocket", ...), newManagedServer("WebSub-HTTP", ...), or
newManagedServer("WebSub-HTTPS", ...) and start r.runServer if no existing
server is found; keep existing error handling and locking semantics.
🧹 Nitpick comments (6)
gateway/gateway-controller/pkg/utils/api_deployment.go (1)

657-746: 💤 Low value

Update documentation and consider extracting vhost resolution logic to reduce duplication.

The function-level doc comment (lines 653–656) and the APIDeploymentParams.Kind comment (line 47) still reference only RestAPI and WebSubAPI, but the code now handles WebBrokerApi as well. Update both to include the third kind.

Additionally, the three case blocks in resolveVhostSentinels are nearly identical—they differ only in the type assertion—and duplicate ~60 lines of logic. Extracting the vhost resolution logic into a helper function that operates on the common anonymous Vhosts struct would eliminate this repetition and make future vhost-field changes or additional kinds a single-line addition.

♻️ Possible helper extraction sketch
// resolveVhostsInPlace fills or resolves a vhosts pointer using the gateway-default
// sentinel rules, returning the (possibly new) pointer.
func resolveVhostsInPlace(
	vhosts *struct {
		Main    string  `json:"main" yaml:"main"`
		Sandbox *string `json:"sandbox,omitempty" yaml:"sandbox,omitempty"`
	},
	routerCfg *config.RouterConfig,
) *struct {
	Main    string  `json:"main" yaml:"main"`
	Sandbox *string `json:"sandbox,omitempty" yaml:"sandbox,omitempty"`
} {
	if vhosts == nil {
		out := &struct {
			Main    string  `json:"main" yaml:"main"`
			Sandbox *string `json:"sandbox,omitempty" yaml:"sandbox,omitempty"`
		}{Main: routerCfg.VHosts.Main.Default}
		if sb := routerCfg.VHosts.Sandbox.Default; sb != "" {
			out.Sandbox = &sb
		}
		return out
	}
	if vhosts.Main == constants.VHostGatewayDefault {
		vhosts.Main = routerCfg.VHosts.Main.Default
	}
	if vhosts.Sandbox != nil && *vhosts.Sandbox == constants.VHostGatewayDefault {
		if resolved := routerCfg.VHosts.Sandbox.Default; resolved != "" {
			vhosts.Sandbox = &resolved
		} else {
			vhosts.Sandbox = nil
		}
	}
	return vhosts
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@gateway/gateway-controller/pkg/utils/api_deployment.go` around lines 657 -
746, The comments mentioning supported kinds are out of date: update the
function-level doc for resolveVhostSentinels and the APIDeploymentParams.Kind
comment to include WebBrokerApi; then remove the duplicated vhost-handling
blocks by extracting the common logic into a helper (e.g., resolveVhostsInPlace)
that accepts/returns the anonymous VHosts struct and routerCfg, and call that
helper from each type assertion branch (api.RestAPI, api.WebSubAPI,
api.WebBrokerApi) to set c.Spec.Vhosts and *cfg accordingly.
event-gateway/gateway-runtime/internal/hub/hub.go (2)

533-562: ⚖️ Poor tradeoff

Chain key reuse may cause confusion.

Line 543 reuses SubscribeChainKey to store on_connection_init.request policies. This overloads the semantic meaning of SubscribeChainKey and conflates two distinct lifecycle events (subscription vs. connection initialization). If a WebBroker API needs different policies for WebSocket handshake vs. topic subscription, this reuse creates ambiguity in configuration and debugging.

Consider adding a dedicated ConnectionInitRequestChainKey field to ChannelBinding to clarify intent and support independent policy configuration.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@event-gateway/gateway-runtime/internal/hub/hub.go` around lines 533 - 562,
The code incorrectly reuses ChannelBinding.SubscribeChainKey inside
ProcessConnectionInitRequest to run on_connection_init.request policies, which
conflates subscription and connection-init lifecycle policies; add a new field
on ChannelBinding (e.g., ConnectionInitRequestChainKey) and update
ProcessConnectionInitRequest to use
h.engine.GetChain(binding.ConnectionInitRequestChainKey) and
ExecuteRequestHeaderPolicies with that key (falling back to SubscribeChainKey
only if desired), adjust MessageToRequestHeaderContext/Application calls to
reference the new key where appropriate, and ensure logging (e.g.,
logShortCircuit) and error messages include the new chain key symbol for
clarity.

609-660: 💤 Low value

Silent no-op when chain is missing may hinder debugging.

Lines 626-628 silently return (msg, false, nil) when the chain key is registered but h.engine.GetChain(chainKey) returns nil. The comment "this might be OK if the chain has no policies" suggests this is intentional, but in practice a registered chain key that resolves to nil often indicates a configuration error (e.g., chain not loaded, xDS sync failure, typo in chain key).

Emitting a debug or warning log when a non-empty chain key resolves to nil would aid troubleshooting without failing the request:

📋 Suggested logging enhancement
 	chain := h.engine.GetChain(chainKey)
 	if chain == nil {
-		// Chain key not found - this might be OK if the chain has no policies
+		slog.Debug("Chain key not found in policy engine; skipping policy execution",
+			"binding", bindingName,
+			"chain_key", chainKey)
 		return msg, false, nil
 	}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@event-gateway/gateway-runtime/internal/hub/hub.go` around lines 609 - 660,
ProcessByChainKey currently silently returns when h.engine.GetChain(chainKey)
yields nil, which hides configuration issues; update the nil branch after
calling h.engine.GetChain(chainKey) to emit a warning/debug log that includes
the chainKey and bindingName (e.g., use the Hub logger like h.logger.Warnf or
equivalent) indicating the chain resolved to nil, then preserve the existing
behavior of returning (msg, false, nil). Reference: ProcessByChainKey and
h.engine.GetChain(chainKey).
event-gateway/gateway-runtime/internal/xdsclient/handler.go (2)

240-327: 🏗️ Heavy lift

Repeated map[string]interface{} traversal is brittle and hard to maintain.

toWebSubApiBinding and toWebBrokerApiBinding reimplement JSON decoding by hand. Since you already have data (the raw JSON bytes) just before json.Unmarshal(data, &ecr) at Line 185, a cleaner approach is to keep ecr.Channels/ecr.Policies as json.RawMessage and decode them into the kind-specific structs ([]ChannelEntry/PoliciesEntry or map[string]WebBrokerChannelEntry/ProtocolMediationPolicies) once Kind is known. That eliminates the manual type-assertion ladder, reduces panic surface on unexpected shapes, and keeps the typed structs (Lines 73-95) authoritative.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@event-gateway/gateway-runtime/internal/xdsclient/handler.go` around lines 240
- 327, The toWebSubApiBinding function is manually traversing ecr.Channels and
ecr.Policies as map[string]interface{} which is brittle; instead change the
EventChannelResource struct so Channels and Policies are json.RawMessage (or
keep raw data from earlier) and, once Kind is known, unmarshal ecr.Channels into
[]ChannelEntry and ecr.Policies into PoliciesEntry (or the appropriate typed
structs) before calling toWebSubApiBinding; update toWebSubApiBinding to remove
the type-assertion ladders and use the decoded []ChannelEntry/PoliciesEntry
directly (referencing toWebSubApiBinding, EventChannelResource, ChannelEntry,
and PoliciesEntry to locate the changes).

66-95: ⚡ Quick win

Unused struct definitions for nested decoding.

ProtocolMediationPolicies and ConnectionInitPolicies are declared but never used — toWebBrokerApiBinding decodes the same shapes manually from map[string]interface{} (Lines 422-439). Additionally, the doc comments above describe on_connection_init as a ConnectionInitPolicies with request/response, while the runtime decoder at Line 426 treats it as a flat []interface{} array, making the type description misleading.

Consider either driving the decoding through these structs (via json.Unmarshal on the inner payload) or removing the unused types and aligning the doc comment with the flat-array shape actually consumed.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@event-gateway/gateway-runtime/internal/xdsclient/handler.go` around lines 66
- 95, The declared types ProtocolMediationPolicies and ConnectionInitPolicies
are unused and their doc comment for on_connection_init mismatches the runtime
decoder in toWebBrokerApiBinding; fix by either (A) using these structs to
decode the inner payloads instead of manual map[string]interface{} parsing —
e.g. in toWebBrokerApiBinding unmarshal the channels' "policies" and
"on_connection_init" JSON into ProtocolMediationPolicies/ConnectionInitPolicies
so the types are actually exercised, or (B) remove ProtocolMediationPolicies and
ConnectionInitPolicies and update the comments to reflect the flat []interface{}
shape the decoder expects; update toWebBrokerApiBinding accordingly and keep
symbols ProtocolMediationPolicies, ConnectionInitPolicies,
WebBrokerChannelEntry, and toWebBrokerApiBinding in sync.
event-gateway/gateway-runtime/internal/runtime/runtime.go (1)

814-874: 💤 Low value

Two helpers diverge subtly; consider consolidating.

extractTopicsFromChannelPolicies (consume only) and extractAllTopicsFromChannelPolicies (produce + consume) duplicate the same defaulting logic. The latter sets hasConsumeTopics but always falls back to the normalized channel name in both branches at Lines 858-866, so the two flags currently produce identical output for the produce/consume defaults. A single helper returning a {produce, consume []string} struct would remove the duplication and the easy-to-miss double assignment.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@event-gateway/gateway-runtime/internal/runtime/runtime.go` around lines 814 -
874, Both helpers duplicate defaulting/dedup logic; replace
extractTopicsFromChannelPolicies and extractAllTopicsFromChannelPolicies with a
single helper (e.g., extractProduceConsumeTopics) that returns a struct or two
slices {Produce []string, Consume []string}; implement the logic once:
collect/uniq produce topics from channelDef.ProduceTo (fall back to
binding.NormalizeTopicSegment(channelName) when none), collect/uniq consume
topics from channelDef.ConsumeFrom (fall back to
binding.NormalizeTopicSegment(channelName) when none), and return de-duplicated
slices for each; update all call sites to use the new helper and remove the old
functions.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@docs/rest-apis/gateway/schemas.md`:
- Around line 1528-1529: The docs currently allow a WebBroker channel with
neither produceTo nor consumeFrom; update the OpenAPI schema for the WebBroker
channel to require at least one mapping by adding an anyOf (or oneOf) clause
that enforces required: ["produceTo"] OR required: ["consumeFrom"], and then
update the markdown docs (the table rows for produceTo and consumeFrom) to
mention the new rule (e.g., "At least one of produceTo or consumeFrom is
required"). Reference the produceTo and consumeFrom properties and the WebBroker
channel schema when making the change so the generated docs and source OpenAPI
reflect the constraint.
- Around line 774-776: The examples for WebSubAPIRequest and WebSubAPI must be
updated to match the new WebhookAPIData contract: replace array-based channels
with an object keyed by channel name (matching the documented property name
channels) and change per-channel policy definitions to use event-scoped policies
(as defined by WebSubAllChannelPolicies and WebSubChannel) instead of the old
array entries with method; update any example payloads and descriptions to show
a channel name key mapped to a WebSubChannel object containing policies
organized by event type and any per-channel overrides under additionalProperties
semantics so examples align with WebhookAPIData, WebSubChannel,
WebSubAllChannelPolicies, channels and policies.

In `@docs/rest-apis/gateway/websub-api-management.md`:
- Around line 512-516: The example payload for the "Update API key" endpoint
doesn't match the documented schema APIKeyUpdateRequest: the example shows a
"name" property while the schema defines no request properties; fix by either
adding a "name" string property (and description) to the APIKeyUpdateRequest
schema so the example is valid, or change the example JSON to an empty object
(or whatever the schema expects) and update the example caption to reflect no
body required—ensure you modify the APIKeyUpdateRequest schema and the example
block consistently so clients see a single authoritative contract.

In `@event-gateway/gateway-runtime/internal/connectors/types.go`:
- Around line 52-60: Add a brief comment above the ProcessConnectionInitResponse
method in the interface explaining that, unlike
ProcessConnectionInitRequest/ProcessProduce/ProcessConsume which return
(*Message, bool, error) and support policy chain short-circuiting,
ProcessConnectionInitResponse intentionally does not support short-circuiting
and therefore omits the bool; note that response policies always run to
completion and implementations must not rely on a ShortCircuited flag for this
method.

In `@event-gateway/gateway-runtime/internal/hub/hub.go`:
- Around line 564-591: ProcessConnectionInitResponse is missing a check for
policy short-circuiting after executing request header policies for
binding.OutboundChainKey: after calling h.engine.ExecuteRequestHeaderPolicies
(and before ApplyRequestHeaderResult) inspect result.ShortCircuited and handle
it consistently with other flows (return early). Update
ProcessConnectionInitResponse to treat a short-circuited result the same way as
other methods — either return an appropriate error (to reject the handshake) or
change the function signature if the caller must distinguish short-circuit
success vs. failure; ensure you reference the result returned by
ExecuteRequestHeaderPolicies and use result.ShortCircuited to decide the early
return.

In `@event-gateway/gateway-runtime/internal/runtime/runtime.go`:
- Around line 1123-1269: AddWebBrokerApiBinding computes allTopics but never
stores them into r.bindingTopics[wbb.Name], and RemoveWebBrokerApiBinding does
not delete those Kafka topics on teardown; update AddWebBrokerApiBinding (after
building ch/Topics metadata and before releasing the lock) to set
r.bindingTopics[wbb.Name] = allTopics, and update RemoveWebBrokerApiBinding to
call the corresponding broker driver DeleteTopics (or r.registry-created
brokerDriver.DeleteTopics/bd.DeleteTopics with r.bindingTopics[name]) during
removal cleanup (or add a clear comment if topics are intentionally retained) so
WebBrokerApi has the same topic lifecycle as
AddWebSubApiBinding/RemoveWebSubApiBinding.
- Around line 294-403: Channel-level policy chain keys built by
buildWebBrokerApiPolicyChains are placed only into Metadata["channelChains"] but
not recorded on the hub ChannelBinding, so unregisterBindingChains cannot clean
them up; update the binding registration to populate a ChannelChainKeys (or
similar) field on hub.ChannelBinding with per-channel
ConnInitReq/ConnInitResp/Produce/Consume keys (derive from channelChains or
channelChainsToMap) so unregisterBindingChains can remove them. Also unify
BrokerDriver config resolution: instead of directly using
wbb.BrokerDriver.Config with a fallback to Properties here (where you call
r.registry.CreateBrokerDriver), reuse the same broker-driver config resolution
helper used by AddWebBrokerApiBinding (the existing code path that reads
wbb.BrokerDriver.Config) so both static and dynamic load paths resolve
Config/Properties identically before calling r.registry.CreateBrokerDriver.
- Around line 131-136: The constructor-created r.wsMux (and r.websubMux) is
being shadowed in LoadChannels by local wsMux and websubMux variables; replace
the local allocations in LoadChannels with reuse of the existing r.wsMux and
r.websubMux so bindings register on the same mux instance the New constructor
created (and which Run later wires into the WebSocket server), ensuring
AddWebBrokerApiBinding and other registration calls always target
r.wsMux/r.websubMux rather than a locally scoped http.NewServeMux().

In `@event-gateway/gateway-runtime/internal/xdsclient/handler.go`:
- Around line 182-194: The logs in handler.go are emitting sensitive broker
configuration: replace the current slog.Info calls that log the raw JSON and
ecr.BrokerDriver with non-production-safe output — either lower them to
slog.Debug and gate behind debug-level logging, or remove them; specifically
stop logging the entire raw JSON and the ecr.BrokerDriver.Properties field
(redact or omit Properties) when printing EventChannelResource (type
EventChannelResource and variable ecr), and if you keep a debug log create a
small sanitized summary (UUID, Name, Kind, broker driver type only) rather than
the full BrokerDriver object.

In `@gateway/gateway-controller/pkg/utils/api_deployment.go`:
- Around line 311-319: This branch skips validation for api.WebBrokerApi (case
api.WebBrokerApi) which leaves downstream calls like validateArtifactConflicts
and the store/render flow operating on unvalidated specs; create a tracking
issue for adding WebBrokerApi support to config.Validator and reference that
issue ID in the TODO comment, and make the unsupported-kind path in
s.validator.Validate defensive (ensure validator.Validate returns a clear
validation error or empty-but-explicit result for unknown kinds so the caller
(the switch handling api.WebBrokerApi) can either log and return a
ValidationErrorListError via s.logValidationErrors/ValidationErrorListError or
otherwise abort processing instead of proceeding silently).

---

Outside diff comments:
In `@event-gateway/gateway-runtime/internal/runtime/runtime.go`:
- Around line 445-490: The current Run block unconditionally creates
WebSocket/WebSub servers when r.cfg.ControlPlane.Enabled is true, which can
duplicate servers already added by LoadChannels; modify Run to first check
r.servers for an existing managed server matching the intended role/port before
calling newManagedServer and appending/running it. Specifically, in the
ControlPlaneEnabled branch around the WebSocket/WebSub creation, use a lookup
(by port and role/name) against r.servers (the slice populated by LoadChannels)
and only call newManagedServer("WebSocket", ...),
newManagedServer("WebSub-HTTP", ...), or newManagedServer("WebSub-HTTPS", ...)
and start r.runServer if no existing server is found; keep existing error
handling and locking semantics.

---

Nitpick comments:
In `@event-gateway/gateway-runtime/internal/hub/hub.go`:
- Around line 533-562: The code incorrectly reuses
ChannelBinding.SubscribeChainKey inside ProcessConnectionInitRequest to run
on_connection_init.request policies, which conflates subscription and
connection-init lifecycle policies; add a new field on ChannelBinding (e.g.,
ConnectionInitRequestChainKey) and update ProcessConnectionInitRequest to use
h.engine.GetChain(binding.ConnectionInitRequestChainKey) and
ExecuteRequestHeaderPolicies with that key (falling back to SubscribeChainKey
only if desired), adjust MessageToRequestHeaderContext/Application calls to
reference the new key where appropriate, and ensure logging (e.g.,
logShortCircuit) and error messages include the new chain key symbol for
clarity.
- Around line 609-660: ProcessByChainKey currently silently returns when
h.engine.GetChain(chainKey) yields nil, which hides configuration issues; update
the nil branch after calling h.engine.GetChain(chainKey) to emit a warning/debug
log that includes the chainKey and bindingName (e.g., use the Hub logger like
h.logger.Warnf or equivalent) indicating the chain resolved to nil, then
preserve the existing behavior of returning (msg, false, nil). Reference:
ProcessByChainKey and h.engine.GetChain(chainKey).

In `@event-gateway/gateway-runtime/internal/runtime/runtime.go`:
- Around line 814-874: Both helpers duplicate defaulting/dedup logic; replace
extractTopicsFromChannelPolicies and extractAllTopicsFromChannelPolicies with a
single helper (e.g., extractProduceConsumeTopics) that returns a struct or two
slices {Produce []string, Consume []string}; implement the logic once:
collect/uniq produce topics from channelDef.ProduceTo (fall back to
binding.NormalizeTopicSegment(channelName) when none), collect/uniq consume
topics from channelDef.ConsumeFrom (fall back to
binding.NormalizeTopicSegment(channelName) when none), and return de-duplicated
slices for each; update all call sites to use the new helper and remove the old
functions.

In `@event-gateway/gateway-runtime/internal/xdsclient/handler.go`:
- Around line 240-327: The toWebSubApiBinding function is manually traversing
ecr.Channels and ecr.Policies as map[string]interface{} which is brittle;
instead change the EventChannelResource struct so Channels and Policies are
json.RawMessage (or keep raw data from earlier) and, once Kind is known,
unmarshal ecr.Channels into []ChannelEntry and ecr.Policies into PoliciesEntry
(or the appropriate typed structs) before calling toWebSubApiBinding; update
toWebSubApiBinding to remove the type-assertion ladders and use the decoded
[]ChannelEntry/PoliciesEntry directly (referencing toWebSubApiBinding,
EventChannelResource, ChannelEntry, and PoliciesEntry to locate the changes).
- Around line 66-95: The declared types ProtocolMediationPolicies and
ConnectionInitPolicies are unused and their doc comment for on_connection_init
mismatches the runtime decoder in toWebBrokerApiBinding; fix by either (A) using
these structs to decode the inner payloads instead of manual
map[string]interface{} parsing — e.g. in toWebBrokerApiBinding unmarshal the
channels' "policies" and "on_connection_init" JSON into
ProtocolMediationPolicies/ConnectionInitPolicies so the types are actually
exercised, or (B) remove ProtocolMediationPolicies and ConnectionInitPolicies
and update the comments to reflect the flat []interface{} shape the decoder
expects; update toWebBrokerApiBinding accordingly and keep symbols
ProtocolMediationPolicies, ConnectionInitPolicies, WebBrokerChannelEntry, and
toWebBrokerApiBinding in sync.

In `@gateway/gateway-controller/pkg/utils/api_deployment.go`:
- Around line 657-746: The comments mentioning supported kinds are out of date:
update the function-level doc for resolveVhostSentinels and the
APIDeploymentParams.Kind comment to include WebBrokerApi; then remove the
duplicated vhost-handling blocks by extracting the common logic into a helper
(e.g., resolveVhostsInPlace) that accepts/returns the anonymous VHosts struct
and routerCfg, and call that helper from each type assertion branch
(api.RestAPI, api.WebSubAPI, api.WebBrokerApi) to set c.Spec.Vhosts and *cfg
accordingly.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: b29a95a6-f3e4-49ea-9786-692ea01fe929

📥 Commits

Reviewing files that changed from the base of the PR and between 6154540 and fc0ea91.

⛔ Files ignored due to path filters (1)
  • go.work is excluded by !**/*.work
📒 Files selected for processing (19)
  • docs/rest-apis/gateway/schemas.md
  • docs/rest-apis/gateway/webbroker-api-management.md
  • docs/rest-apis/gateway/websub-api-management.md
  • event-gateway/gateway-runtime/cmd/event-gateway/plugins.go
  • event-gateway/gateway-runtime/configs/channels-webbrokerapi-example.yaml
  • event-gateway/gateway-runtime/configs/channels.yaml
  • event-gateway/gateway-runtime/configs/config.toml
  • event-gateway/gateway-runtime/internal/binding/types.go
  • event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/consumer.go
  • event-gateway/gateway-runtime/internal/connectors/receiver/websocket/broker_api_connector.go
  • event-gateway/gateway-runtime/internal/connectors/types.go
  • event-gateway/gateway-runtime/internal/hub/hub.go
  • event-gateway/gateway-runtime/internal/runtime/runtime.go
  • event-gateway/gateway-runtime/internal/xdsclient/handler.go
  • gateway/gateway-controller/api/management-openapi.yaml
  • gateway/gateway-controller/pkg/api/management/generated.go
  • gateway/gateway-controller/pkg/policyxds/event_channel_translator.go
  • gateway/gateway-controller/pkg/storage/sql_store.go
  • gateway/gateway-controller/pkg/utils/api_deployment.go
✅ Files skipped from review due to trivial changes (2)
  • docs/rest-apis/gateway/webbroker-api-management.md
  • gateway/gateway-controller/pkg/api/management/generated.go
🚧 Files skipped from review as they are similar to previous changes (10)
  • event-gateway/gateway-runtime/cmd/event-gateway/plugins.go
  • event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/consumer.go
  • event-gateway/gateway-runtime/configs/channels-webbrokerapi-example.yaml
  • event-gateway/gateway-runtime/internal/binding/types.go
  • gateway/gateway-controller/pkg/storage/sql_store.go
  • event-gateway/gateway-runtime/configs/config.toml
  • event-gateway/gateway-runtime/configs/channels.yaml
  • gateway/gateway-controller/pkg/policyxds/event_channel_translator.go
  • gateway/gateway-controller/api/management-openapi.yaml
  • event-gateway/gateway-runtime/internal/connectors/receiver/websocket/broker_api_connector.go

Comment thread docs/rest-apis/gateway/schemas.md Outdated
Comment thread docs/rest-apis/gateway/schemas.md
Comment thread docs/rest-apis/gateway/websub-api-management.md
Comment thread event-gateway/gateway-runtime/internal/connectors/types.go
Comment on lines +564 to +591
// ProcessConnectionInitResponse applies on_connection_init.response policies during connection handshake.
// Used by protocol mediation (WebBrokerApi) for response customization during handshake.
// Returns the (possibly mutated) message.
func (h *Hub) ProcessConnectionInitResponse(ctx context.Context, bindingName string, msg *connectors.Message) (*connectors.Message, error) {
binding := h.GetBinding(bindingName)
if binding == nil {
return nil, fmt.Errorf("binding not found: %s", bindingName)
}

// Apply connection_init response policies if present.
// Currently using OutboundChainKey for on_connection_init.response
// In the future, we could add a dedicated field to ChannelBinding if needed.
if binding.OutboundChainKey != "" {
chain := h.engine.GetChain(binding.OutboundChainKey)
if chain != nil {
reqHeaderCtx := MessageToRequestHeaderContext(msg, binding)
result, err := h.engine.ExecuteRequestHeaderPolicies(ctx, binding.OutboundChainKey, reqHeaderCtx.SharedContext, reqHeaderCtx)
if err != nil {
return nil, fmt.Errorf("connection_init response policy execution failed: %w", err)
}
if err := ApplyRequestHeaderResult(result, msg); err != nil {
return nil, fmt.Errorf("failed to apply connection_init response result: %w", err)
}
}
}

return msg, nil
}
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot May 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Missing short-circuit check in response policy execution.

Lines 580-586 execute header policies but do not check result.ShortCircuited. Every other policy execution path in this file (e.g., ProcessSubscribe, ProcessInbound, ProcessByChainKey) checks the short-circuit flag and returns early if set. The omission here is inconsistent and may prevent policies from rejecting connection handshakes during the response phase.

If connection-init response policies intentionally cannot short-circuit, document this constraint. Otherwise, add the short-circuit check:

🔒 Proposed fix to add short-circuit handling
 		result, err := h.engine.ExecuteRequestHeaderPolicies(ctx, binding.OutboundChainKey, reqHeaderCtx.SharedContext, reqHeaderCtx)
 		if err != nil {
 			return nil, fmt.Errorf("connection_init response policy execution failed: %w", err)
 		}
+		if result.ShortCircuited {
+			logShortCircuit("Connection init response short-circuited", bindingName, binding.OutboundChainKey, result.ImmediateResponse)
+			// Return error or handle short-circuit appropriately for response phase
+			return nil, fmt.Errorf("connection init response short-circuited by policy")
+		}
 		if err := ApplyRequestHeaderResult(result, msg); err != nil {

Note: The method signature returns (*Message, error) not (*Message, bool, error), so the caller cannot detect short-circuit via a bool. You may need to adjust the return signature or handle short-circuit as an error at this layer.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@event-gateway/gateway-runtime/internal/hub/hub.go` around lines 564 - 591,
ProcessConnectionInitResponse is missing a check for policy short-circuiting
after executing request header policies for binding.OutboundChainKey: after
calling h.engine.ExecuteRequestHeaderPolicies (and before
ApplyRequestHeaderResult) inspect result.ShortCircuited and handle it
consistently with other flows (return early). Update
ProcessConnectionInitResponse to treat a short-circuited result the same way as
other methods — either return an appropriate error (to reject the handshake) or
change the function signature if the caller must distinguish short-circuit
success vs. failure; ensure you reference the result returned by
ExecuteRequestHeaderPolicies and use result.ShortCircuited to decide the early
return.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will check this and consider this as a future improvement

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@senthuran16, understood. Would you like me to open a follow-up GitHub issue to track the missing short-circuit check in ProcessConnectionInitResponse so it isn't lost?


🧠 Learnings used
Learnt from: Krishanx92
Repo: wso2/api-platform PR: 1496
File: gateway/gateway-controller/pkg/policyxds/combined_cache.go:559-559
Timestamp: 2026-03-26T09:58:10.495Z
Learning: When reviewing Go code in this repo, usages of `cache.NewTestRawResponse` from `github.com/envoyproxy/go-control-plane/pkg/cache/v3` (Go-Control-Plane v0.14.0+) are valid for production use. Do not flag it as a missing/non-existent function or as “test-only”; the function is publicly exported with signature `func NewTestRawResponse(req *discovery.DiscoveryRequest, version string, resources []types.ResourceWithTTL) *RawResponse`.

Comment on lines 131 to +136
wsMux := http.NewServeMux()
websubMux := http.NewServeMux()

// Store wsMux for dynamic bindings
r.wsMux = wsMux

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot May 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Constructor-initialized wsMux is discarded.

New already constructs r.wsMux = http.NewServeMux() (Line 108), but LoadChannels allocates a fresh wsMux locally at Line 131 and reassigns r.wsMux at Line 135. If AddWebBrokerApiBinding is called before LoadChannels runs (or LoadChannels is skipped in pure-xDS mode), receivers will register on the original mux while Run() later wires a different mux into the WebSocket server — or vice versa.

Recommend reusing r.wsMux instead of allocating a new one (and the same for the WebSub mux, which is currently shadowed by a local http.NewServeMux()).

🔧 Suggested change
-	// Create shared HTTP muxes for port sharing.
-	wsMux := http.NewServeMux()
-	websubMux := http.NewServeMux()
-
-	// Store wsMux for dynamic bindings
-	r.wsMux = wsMux
+	// Reuse the muxes created in New() so static and dynamic bindings
+	// share the same routing table.
+	wsMux := r.wsMux
+	websubMux := http.NewServeMux() // TODO: align with r.websubMux (DynamicMux) for dynamic bindings
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
wsMux := http.NewServeMux()
websubMux := http.NewServeMux()
// Store wsMux for dynamic bindings
r.wsMux = wsMux
// Reuse the muxes created in New() so static and dynamic bindings
// share the same routing table.
wsMux := r.wsMux
websubMux := http.NewServeMux() // TODO: align with r.websubMux (DynamicMux) for dynamic bindings
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@event-gateway/gateway-runtime/internal/runtime/runtime.go` around lines 131 -
136, The constructor-created r.wsMux (and r.websubMux) is being shadowed in
LoadChannels by local wsMux and websubMux variables; replace the local
allocations in LoadChannels with reuse of the existing r.wsMux and r.websubMux
so bindings register on the same mux instance the New constructor created (and
which Run later wires into the WebSocket server), ensuring
AddWebBrokerApiBinding and other registration calls always target
r.wsMux/r.websubMux rather than a locally scoped http.NewServeMux().

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will check this and consider this as a future improvement

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@senthuran16, understood. Would you like me to open a follow-up GitHub issue to track the mux reuse alignment (r.wsMux/r.websubMux in LoadChannels) so it isn't lost?


🧠 Learnings used
Learnt from: Krishanx92
Repo: wso2/api-platform PR: 1496
File: gateway/gateway-controller/pkg/policyxds/combined_cache.go:559-559
Timestamp: 2026-03-26T09:58:10.495Z
Learning: When reviewing Go code in this repo, usages of `cache.NewTestRawResponse` from `github.com/envoyproxy/go-control-plane/pkg/cache/v3` (Go-Control-Plane v0.14.0+) are valid for production use. Do not flag it as a missing/non-existent function or as “test-only”; the function is publicly exported with signature `func NewTestRawResponse(req *discovery.DiscoveryRequest, version string, resources []types.ResourceWithTTL) *RawResponse`.

Comment on lines +294 to +403
// Process WebBrokerApi bindings (protocol mediation).
for _, wbb := range parseResult.WebBrokerApiBindings {
vhost := defaultVhost(wbb.Vhost)

// Build API-level policy chains.
apiConnInitReqKey, _, _, _, err := r.buildWebBrokerApiPolicyChains(wbb, vhost, "")
if err != nil {
return fmt.Errorf("failed to build API-level chains for WebBrokerApi %q: %w", wbb.Name, err)
}

// Build per-channel policy chains and collect topics.
channelChains := make(map[string]ChannelPolicyChains)
allTopics := []string{} // All topics (produce + consume) for ensuring they exist
topicToChannel := make(map[string]string) // Only consume topics for subscription mapping

for channelName, channelDef := range wbb.Channels {
connInitReqKey, connInitRespKey, produceKey, consumeKey, err := r.buildWebBrokerApiPolicyChains(wbb, vhost, channelName)
if err != nil {
return fmt.Errorf("failed to build chains for channel %q in WebBrokerApi %q: %w", channelName, wbb.Name, err)
}

channelChains[channelName] = ChannelPolicyChains{
ConnInitReqKey: connInitReqKey,
ConnInitRespKey: connInitRespKey,
ProduceKey: produceKey,
ConsumeKey: consumeKey,
}

// Extract ALL topics (produce + consume) to ensure they exist in Kafka
allChannelTopics := extractAllTopicsFromChannelPolicies(channelName, channelDef)
allTopics = append(allTopics, allChannelTopics...)

// Extract ONLY consume topics for subscription mapping
consumeTopics := extractTopicsFromChannelPolicies(channelName, channelDef)
for _, topic := range consumeTopics {
topicToChannel[topic] = channelName
}
}

// Register binding in hub.
r.hub.RegisterBinding(hub.ChannelBinding{
APIID: wbb.APIID,
Name: wbb.Name,
Mode: "protocol-mediation",
Context: wbb.Context,
Version: wbb.Version,
Vhost: vhost,
SubscribeChainKey: apiConnInitReqKey,
InboundChainKey: "", // Determined per-channel
OutboundChainKey: "", // Determined per-channel
})

// Create broker-driver.
brokerDriverType := wbb.BrokerDriver.Type
if brokerDriverType == "" {
brokerDriverType = "kafka"
}
brokerDriverConfig := wbb.BrokerDriver.Config
if brokerDriverConfig == nil {
brokerDriverConfig = wbb.BrokerDriver.Properties
}
brokerDriver, err := r.registry.CreateBrokerDriver(brokerDriverType, brokerDriverConfig)
if err != nil {
return fmt.Errorf("failed to create broker-driver for WebBrokerApi %q: %w", wbb.Name, err)
}
r.brokerDrivers = append(r.brokerDrivers, brokerDriver)

hasWS = true

ch := connectors.ChannelInfo{
Name: wbb.Name,
Mode: "protocol-mediation",
Context: wbb.Context,
Version: wbb.Version,
Vhost: vhost,
Topics: allTopics,
Metadata: map[string]interface{}{
"channelChains": channelChainsToMap(channelChains),
"topicToChannel": topicToChannel,
"channelNames": getChannelNames(wbb.Channels),
},
}

// Create WebBrokerApi receiver.
receiverType := wbb.Receiver.Type
if receiverType == "" {
receiverType = "websocket"
}

ep, err := r.registry.CreateReceiver(receiverType+"-broker-api", connectors.ReceiverConfig{
Channel: ch,
Processor: r.hub,
BrokerDriver: brokerDriver,
RuntimeID: r.cfg.RuntimeID,
Mux: wsMux,
})
if err != nil {
return fmt.Errorf("failed to create receiver for WebBrokerApi %q: %w", wbb.Name, err)
}
r.receivers = append(r.receivers, ep)

slog.Info("Registered WebBrokerApi binding",
"name", wbb.Name,
"context", wbb.Context,
"version", wbb.Version,
"receiver", receiverType,
"topics", allTopics,
"channels", len(wbb.Channels),
)
}
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot May 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Two consistency concerns in the WebBrokerApi LoadChannels path.

  1. Channel-level chains are built (Lines 305-320) and surfaced via Metadata["channelChains"], but only the API-level apiConnInitReqKey is stored on the hub ChannelBinding (Lines 334-344). unregisterBindingChains (Lines 929-959) only knows about SubscribeChainKey/InboundChainKey/etc.; per-channel keys for WebBrokerApi will leak on removal. Consider populating ChannelChainKeys (or an equivalent) so they participate in cleanup.

  2. AddWebBrokerApiBinding reads only wbb.BrokerDriver.Config (Line 1205) while this static path also falls back to wbb.BrokerDriver.Properties (Lines 351-354). Recommend aligning both paths to use the same resolution helper to avoid surprises when the YAML uses properties.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@event-gateway/gateway-runtime/internal/runtime/runtime.go` around lines 294 -
403, Channel-level policy chain keys built by buildWebBrokerApiPolicyChains are
placed only into Metadata["channelChains"] but not recorded on the hub
ChannelBinding, so unregisterBindingChains cannot clean them up; update the
binding registration to populate a ChannelChainKeys (or similar) field on
hub.ChannelBinding with per-channel ConnInitReq/ConnInitResp/Produce/Consume
keys (derive from channelChains or channelChainsToMap) so
unregisterBindingChains can remove them. Also unify BrokerDriver config
resolution: instead of directly using wbb.BrokerDriver.Config with a fallback to
Properties here (where you call r.registry.CreateBrokerDriver), reuse the same
broker-driver config resolution helper used by AddWebBrokerApiBinding (the
existing code path that reads wbb.BrokerDriver.Config) so both static and
dynamic load paths resolve Config/Properties identically before calling
r.registry.CreateBrokerDriver.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will check this and consider this as a future improvement

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@senthuran16, understood. Would you like me to open a follow-up GitHub issue to track these two items so they aren't lost?

  1. Per-channel chain key cleanup — register per-channel ConnInitReq/ConnInitResp/Produce/Consume keys on hub.ChannelBinding (or equivalent) so unregisterBindingChains can clean them up on removal.
  2. BrokerDriver config resolution parity — align the static LoadChannels path (ConfigProperties fallback) with the dynamic AddWebBrokerApiBinding path so both resolve broker-driver config identically.

🧠 Learnings used
Learnt from: Krishanx92
Repo: wso2/api-platform PR: 1496
File: gateway/gateway-controller/pkg/policyxds/combined_cache.go:559-559
Timestamp: 2026-03-26T09:58:10.495Z
Learning: When reviewing Go code in this repo, usages of `cache.NewTestRawResponse` from `github.com/envoyproxy/go-control-plane/pkg/cache/v3` (Go-Control-Plane v0.14.0+) are valid for production use. Do not flag it as a missing/non-existent function or as “test-only”; the function is publicly exported with signature `func NewTestRawResponse(req *discovery.DiscoveryRequest, version string, resources []types.ResourceWithTTL) *RawResponse`.

Comment on lines +1123 to +1269
// AddWebBrokerApiBinding dynamically adds a WebBrokerApi binding at runtime.
func (r *Runtime) AddWebBrokerApiBinding(wbb binding.WebBrokerApiBinding) error {
r.mu.Lock()

vhost := defaultVhost(wbb.Vhost)

// Build API-level policy chains.
apiConnInitReqKey, _, _, _, err := r.buildWebBrokerApiPolicyChains(wbb, vhost, "")
if err != nil {
r.mu.Unlock()
return fmt.Errorf("failed to build API-level chains for WebBrokerApi %q: %w", wbb.Name, err)
}

// Build per-channel policy chains and collect topics.
channelChains := make(map[string]ChannelPolicyChains)
allTopics := []string{} // All topics (produce + consume) for ensuring they exist
topicToChannel := make(map[string]string) // Only consume topics for subscription mapping
channelTopics := make(map[string]map[string]string) // Channel-level topic mappings (produceTo, consumeFrom)

for channelName, channelDef := range wbb.Channels {
connInitReqKey, connInitRespKey, produceKey, consumeKey, err := r.buildWebBrokerApiPolicyChains(wbb, vhost, channelName)
if err != nil {
r.mu.Unlock()
return fmt.Errorf("failed to build chains for channel %q in WebBrokerApi %q: %w", channelName, wbb.Name, err)
}

channelChains[channelName] = ChannelPolicyChains{
ConnInitReqKey: connInitReqKey,
ConnInitRespKey: connInitRespKey,
ProduceKey: produceKey,
ConsumeKey: consumeKey,
}

// Store channel topic mappings (use defaults if not specified)
topicMapping := make(map[string]string)
if channelDef.ProduceTo != nil && channelDef.ProduceTo.Topic != "" {
topicMapping["produceTo"] = channelDef.ProduceTo.Topic
} else {
// Default: use normalized channel name for producing
topicMapping["produceTo"] = binding.NormalizeTopicSegment(channelName)
}
if channelDef.ConsumeFrom != nil && channelDef.ConsumeFrom.Topic != "" {
topicMapping["consumeFrom"] = channelDef.ConsumeFrom.Topic
} else {
// Default: use normalized channel name for consuming
topicMapping["consumeFrom"] = binding.NormalizeTopicSegment(channelName)
}
channelTopics[channelName] = topicMapping

// Extract ALL topics (produce + consume) to ensure they exist in Kafka
allChannelTopics := extractAllTopicsFromChannelPolicies(channelName, channelDef)
allTopics = append(allTopics, allChannelTopics...)

// Extract ONLY consume topics for subscription mapping
consumeTopics := extractTopicsFromChannelPolicies(channelName, channelDef)
for _, topic := range consumeTopics {
topicToChannel[topic] = channelName
}

slog.Info("Built policy chains for WebBrokerApi channel",
"api", wbb.Name,
"channel", channelName,
"topics", allChannelTopics)
}

r.hub.RegisterBinding(hub.ChannelBinding{
APIID: wbb.APIID,
Name: wbb.Name,
Mode: "protocol-mediation",
Context: wbb.Context,
Version: wbb.Version,
Vhost: vhost,
SubscribeChainKey: apiConnInitReqKey,
InboundChainKey: "", // Determined per-channel
OutboundChainKey: "", // Determined per-channel
})

// Create broker-driver.
brokerDriverType := "kafka"
if wbb.BrokerDriver.Type != "" {
brokerDriverType = wbb.BrokerDriver.Type
}
brokerDriver, err := r.registry.CreateBrokerDriver(brokerDriverType, wbb.BrokerDriver.Config)
if err != nil {
r.mu.Unlock()
return fmt.Errorf("failed to create broker-driver for WebBrokerApi %q: %w", wbb.Name, err)
}
r.activeBrokerDrivers[wbb.Name] = brokerDriver

ch := connectors.ChannelInfo{
Name: wbb.Name,
Mode: "protocol-mediation",
Context: wbb.Context,
Version: wbb.Version,
Vhost: vhost,
Topics: allTopics,
Metadata: map[string]interface{}{
"channelChains": channelChainsToMap(channelChains),
"topicToChannel": topicToChannel,
"channelNames": getChannelNames(wbb.Channels),
"channelTopics": channelTopics,
},
}

// Determine receiver type (websocket, sse, etc.)
receiverType := "websocket-broker-api"
if wbb.Receiver.Type != "" {
receiverType = wbb.Receiver.Type + "-broker-api"
}

receiver, err := r.registry.CreateReceiver(receiverType, connectors.ReceiverConfig{
Channel: ch,
Processor: r.hub,
BrokerDriver: brokerDriver,
RuntimeID: r.cfg.RuntimeID,
Mux: r.wsMux,
})
if err != nil {
r.mu.Unlock()
return fmt.Errorf("failed to create receiver for WebBrokerApi %q: %w", wbb.Name, err)
}
r.activeReceivers[wbb.Name] = receiver

startNow := r.running
startCtx := r.runCtx
r.mu.Unlock()

// If runtime is already running, start the receiver immediately.
if startNow {
if startCtx == nil {
startCtx = context.Background()
}
if err := r.startReceiverWithRetry(startCtx, wbb.Name, receiver); err != nil {
return fmt.Errorf("failed to start receiver for WebBrokerApi %q: %w", wbb.Name, err)
}
}

slog.Info("Dynamically added WebBrokerApi binding",
"name", wbb.Name,
"context", wbb.Context,
"version", wbb.Version,
"receiver_type", receiverType,
"channels", len(wbb.Channels),
"topics", allTopics)

return nil
}
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot May 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Missing Kafka topic tracking parity with AddWebSubApiBinding.

AddWebSubApiBinding records all Kafka topics in r.bindingTopics[wsb.Name] (Lines 1013-1019) so RemoveWebSubApiBinding can call bd.DeleteTopics(...) on teardown. AddWebBrokerApiBinding computes allTopics (Line 1138, populated at Line 1174) but never assigns it to r.bindingTopics, and RemoveWebBrokerApiBinding (Lines 1272-1310) does not delete Kafka topics. If parity with WebSubApi cleanup is intended, persist allTopics here and add the corresponding DeleteTopics call on removal; if intentional, a short comment explaining why WebBrokerApi topics are kept across removals would help future maintainers.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@event-gateway/gateway-runtime/internal/runtime/runtime.go` around lines 1123
- 1269, AddWebBrokerApiBinding computes allTopics but never stores them into
r.bindingTopics[wbb.Name], and RemoveWebBrokerApiBinding does not delete those
Kafka topics on teardown; update AddWebBrokerApiBinding (after building
ch/Topics metadata and before releasing the lock) to set
r.bindingTopics[wbb.Name] = allTopics, and update RemoveWebBrokerApiBinding to
call the corresponding broker driver DeleteTopics (or r.registry-created
brokerDriver.DeleteTopics/bd.DeleteTopics with r.bindingTopics[name]) during
removal cleanup (or add a clear comment if topics are intentionally retained) so
WebBrokerApi has the same topic lifecycle as
AddWebSubApiBinding/RemoveWebSubApiBinding.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will check this and consider this as a future improvement

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@senthuran16, understood. Would you like me to open a GitHub issue to track the missing r.bindingTopics population and DeleteTopics call in AddWebBrokerApiBinding/RemoveWebBrokerApiBinding for future follow-up?


🧠 Learnings used
Learnt from: Krishanx92
Repo: wso2/api-platform PR: 1496
File: gateway/gateway-controller/pkg/policyxds/combined_cache.go:559-559
Timestamp: 2026-03-26T09:58:10.495Z
Learning: When reviewing Go code in this repo, usages of `cache.NewTestRawResponse` from `github.com/envoyproxy/go-control-plane/pkg/cache/v3` (Go-Control-Plane v0.14.0+) are valid for production use. Do not flag it as a missing/non-existent function or as “test-only”; the function is publicly exported with signature `func NewTestRawResponse(req *discovery.DiscoveryRequest, version string, resources []types.ResourceWithTTL) *RawResponse`.

Comment thread event-gateway/gateway-runtime/internal/xdsclient/handler.go Outdated
Comment on lines +311 to +319
case api.WebBrokerApi:
apiName = c.Spec.DisplayName
apiVersion = c.Spec.Version
// TODO: Add validation for WebBrokerApi once validator supports it
// validationErrors := s.validator.Validate(&c)
// if len(validationErrors) > 0 {
// s.logValidationErrors(params.Logger, apiID, apiName, validationErrors)
// return nil, &ValidationErrorListError{Errors: validationErrors}
// }
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot May 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Track validation gap for WebBrokerApi.

Validation is bypassed for WebBrokerApi while RestAPI and WebSubAPI are both validated here. Downstream calls (e.g., validateArtifactConflicts, store/render flow) will then run against an unvalidated spec until the validator gains support for this kind. Consider tracking this with an issue so the TODO is not lost, and keep the unsupported-kind branch in validator.Validate defensive in the meantime.

Would you like me to open a tracking issue for adding WebBrokerApi support to config.Validator?

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@gateway/gateway-controller/pkg/utils/api_deployment.go` around lines 311 -
319, This branch skips validation for api.WebBrokerApi (case api.WebBrokerApi)
which leaves downstream calls like validateArtifactConflicts and the
store/render flow operating on unvalidated specs; create a tracking issue for
adding WebBrokerApi support to config.Validator and reference that issue ID in
the TODO comment, and make the unsupported-kind path in s.validator.Validate
defensive (ensure validator.Validate returns a clear validation error or
empty-but-explicit result for unknown kinds so the caller (the switch handling
api.WebBrokerApi) can either log and return a ValidationErrorListError via
s.logValidationErrors/ValidationErrorListError or otherwise abort processing
instead of proceeding silently).

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will open an issue and track it

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@senthuran16, sounds good. If you'd like me to open the tracking issue on your behalf, just let me know and I'll do that right away. Otherwise, I'd suggest referencing the issue number in the TODO comment once created, e.g.:

case api.WebBrokerApi:
    apiName = c.Spec.DisplayName
    apiVersion = c.Spec.Version
    // TODO(#<issue-number>): Add validation for WebBrokerApi once validator supports it

🧠 Learnings used
Learnt from: Saadha123
Repo: wso2/api-platform PR: 1420
File: gateway/gateway-controller/pkg/config/policy_validator.go:149-163
Timestamp: 2026-03-20T15:22:10.323Z
Learning: In the API Gateway controller config/transform/builder code under `gateway/gateway-controller/pkg/**`, treat `ResolvePolicyVersion` in `policy_validator.go` as a validation + definition-existence helper only. At its call sites, do not pass the resolved full semver (e.g., `v1.0.0`) downstream—retain and pass the original major-only policy reference string (e.g., `v1`) to `ConvertAPIPolicyToModel`/`convertAPIPolicyToSDK`, since the policy engine expects major-only format. For empty-version handling, note that the `ResolvePolicyVersion` path is not exercised by LLM policies; those policies resolve versions through the transformer’s `policyVersionResolver` instead.

Learnt from: VirajSalaka
Repo: wso2/api-platform PR: 1425
File: gateway/gateway-controller/pkg/api/handlers/handlers.go:730-732
Timestamp: 2026-03-22T04:48:40.788Z
Learning: In the wso2/api-platform `gateway/gateway-controller` service, treat `EventHub` as DB-backed: when `s.eventHub != nil`, `s.db` is guaranteed to be non-nil. Therefore, for handlers and related code, it’s sufficient to guard on `s.eventHub == nil` (e.g., skip EventHub-dependent paths) and you should not also add redundant `s.db == nil` checks. This is based on the project invariant that memory-only (non-DB) mode is planned for removal.

Learnt from: nimsara66
Repo: wso2/api-platform PR: 1438
File: gateway/gateway-controller/pkg/utils/api_deployment.go:353-359
Timestamp: 2026-03-23T02:48:39.635Z
Learning: For secret handling during API deployment in the gateway-controller codebase, follow the intended design “resolve at runtime, persist unresolved.” When calling `saveOrUpdateConfig`, always persist the stored (unresolved) config (`storedCfg`) that still contains `$secret{...}` placeholders. Use the resolved/decrypted config (`resolvedCfg`) only for runtime purposes (e.g., values returned in `APIDeploymentResult.StoredConfig` for xDS/policy evaluation). Never persist or write `resolvedCfg` (plaintext secrets) to the DB or in-memory store—reviews should flag any changes that write decrypted secret values to storage or logs.

Learnt from: dushaniw
Repo: wso2/api-platform PR: 1463
File: gateway/gateway-controller/pkg/controlplane/sync.go:323-362
Timestamp: 2026-03-25T04:34:50.763Z
Learning: In the gateway-controller codebase, it is an accepted design to log and continue after failures in in-memory state updates (e.g., store.Update) or event-hub publishes (e.g., eventHub.PublishEvent) when the failure occurs following a successful DB upsert (e.g., UpsertConfig). Do not review-comment this as a bug or require synchronous rollback/error propagation. This applies to sync logic (e.g., processSyncStatusUpdates/processSyncDeletions), websocket event handlers (e.g., handleAPIUndeployedEvent and similar), and the REST service layer (e.g., saveOrUpdateConfig). The DB upsert rollback is intentionally not enforced because prior config state is not retained before overwriting, and gateway restart is relied on to recover from any resulting in-memory/xDS staleness via store reload from the DB.

Learnt from: VirajSalaka
Repo: wso2/api-platform PR: 1493
File: gateway/gateway-controller/tests/integration/schema_test.go:201-202
Timestamp: 2026-03-26T06:54:34.762Z
Learning: For the wso2/api-platform gateway/gateway-controller service, treat “delete and recreate” of the database (during tests/upgrades) as the intentional/accepted upgrade path for SQL schema changes. Do not raise review issues for the absence of schema migrations or SQL schema-version upgrade logic in this service’s database implementation files (e.g., sqlite.go/postgres.go), since this is a deliberate design choice.

Learnt from: Krishanx92
Repo: wso2/api-platform PR: 1496
File: gateway/gateway-controller/pkg/policyxds/combined_cache.go:559-559
Timestamp: 2026-03-26T09:58:10.495Z
Learning: When reviewing Go code in this repo, usages of `cache.NewTestRawResponse` from `github.com/envoyproxy/go-control-plane/pkg/cache/v3` (Go-Control-Plane v0.14.0+) are valid for production use. Do not flag it as a missing/non-existent function or as “test-only”; the function is publicly exported with signature `func NewTestRawResponse(req *discovery.DiscoveryRequest, version string, resources []types.ResourceWithTTL) *RawResponse`.

Learnt from: Krishanx92
Repo: wso2/api-platform PR: 1500
File: gateway/gateway-controller/pkg/utils/mcp_transformer.go:150-153
Timestamp: 2026-03-27T17:26:39.573Z
Learning: In wso2/api-platform’s `gateway/gateway-controller` service, the OpenAPI spec `gateway/gateway-controller/api/management-openapi.yaml` defines the `Policy` schema’s `version` as required (under `required: [name, version]`) with pattern `'^v\d+$'`, and the generated Go model `gateway/gateway-controller/pkg/api/management/generated.go` represents `Policy.Version` as a non-pointer `string`.

During code review, ensure any PR that constructs an `api.Policy` does not omit/leave `Version` unset; leaving it empty will fail the API validation. If the PR’s change requires `Version` to become optional (or otherwise changes its required/validation semantics), you must update `management-openapi.yaml` accordingly (e.g., remove `version` from the `required` list), then regenerate the Go types so validation behavior matches the spec.

@senthuran16 senthuran16 changed the title [Draft] Add Implementation for Exposing Brokers as Subscribable APIs over Web-friendly Protocols Add Implementation for Exposing Brokers as Subscribable APIs over Web-friendly Protocols May 13, 2026
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

🧹 Nitpick comments (2)
event-gateway/gateway-runtime/internal/connectors/receiver/websocket/broker_api_connector.go (2)

124-138: ⚡ Quick win

Stop ignores its ctx, and per-connection consumer shutdown uses context.Background().

Stop(ctx) accepts a context but never propagates it to closeConnection, and closeConnection invokes conn.kafkaConsumer.Stop(context.Background()). If a caller passes a deadline-bound context to Stop to bound shutdown time, that deadline is silently dropped, and a slow consumer shutdown can stall the gateway. Threading ctx through closeConnection (or at minimum to the consumer Stop call during shutdown) would preserve the caller's timeout semantics.

Also applies to: 508-514

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@event-gateway/gateway-runtime/internal/connectors/receiver/websocket/broker_api_connector.go`
around lines 124 - 138, The Stop method on WebBrokerApiReceiver currently
ignores its ctx and closeConnection calls conn.kafkaConsumer.Stop with
context.Background(); change Stop to pass its ctx into closeConnection (e.g.,
closeConnection(ctx, conn)) and update closeConnection to accept a context
parameter and use that ctx when calling conn.kafkaConsumer.Stop(ctx) so the
caller's deadline/cancellation is honored; make the same ctx-threading change
for the other shutdown call sites referenced (the other closeConnection
invocations around the same connector).

209-232: 💤 Low value

Guard against a nil processed before dereferencing.

If ProcessConnectionInitRequest ever returns (nil, false, nil) (no short-circuit, no error, but no result), lines 227–231 will dereference processed.Headers and panic. Even if the current implementation always returns a non-nil result on the happy path, a defensive nil-check here makes the handler resilient to future contract changes in MessageProcessor. The same consideration applies to the short-circuit branch on lines 213–222.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@event-gateway/gateway-runtime/internal/connectors/receiver/websocket/broker_api_connector.go`
around lines 209 - 232, Guard against a nil processed result from
ProcessConnectionInitRequest: before accessing processed.Headers or
processed.Value in both the shortCircuited branch and the header-update branch,
check if processed == nil and handle it (e.g., log an error via slog or
processLogger and return a 500/appropriate response or skip header updates) to
avoid panics; update the shortCircuited block that reads
processed.Metadata/Headers/Value and the subsequent loop that updates r.Header
to first nil-check processed and perform a safe fallback path.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@event-gateway/ARCHITECTURE.md`:
- Around line 14-17: Add missing language identifiers to the fenced code blocks
in ARCHITECTURE.md: locate the blocks that contain the API examples (e.g., the
block with "POST /websub-apis → CreateWebSubAPI()" and "POST /webbroker-apis →
CreateWebBrokerApi()") and the other similar example blocks called out in the
review, and change their opening fences from ``` to ```text (or another
appropriate language tag) so markdownlint MD040 is satisfied and editors render
correctly.
- Line 332: Update the broken internal link fragment that currently points to
"#11-extensibility--plugin-architecture" so it matches the actual heading
number; replace the fragment with "#12-extensibility--plugin-architecture" (or
remove the numeric prefix entirely to use "
`#extensibility--plugin-architecture`") wherever the link text referencing the
Extensibility/Plugin Architecture appears to ensure the anchor resolves to the
"Extensibility -- Plugin Architecture" section.

In
`@event-gateway/gateway-runtime/internal/connectors/receiver/websocket/broker_api_connector.go`:
- Around line 291-339: channelTopics can be empty and you currently proceed to
call e.brokerDriver.Subscribe which may fail or no-op; change the early-warning
path where len(channelTopics) == 0 to treat it as a fatal handshake error: log
an error (including channelName and connID if already created or create connID
earlier), close the WebSocket, call cancel(), and return before calling
e.brokerDriver.Subscribe so the per-connection consumer
(Subscribe/consumer.Start) is never created for an unconsumable channel; update
the handling around channelTopics, Subscribe, and brokerApiConnection creation
so the function returns early on empty topics.
- Around line 85-98: The handler registered by NewBrokerApiReceiver
(handleUpgrade) can run before Start assigns e.ctx, causing a panic when
handleUpgrade calls context.WithCancel(nil); fix by initializing the receiver's
context field (e.ctx) to context.Background() in the constructor
(NewBrokerApiReceiver) so handleUpgrade always has a non-nil context, and then
in Start replace e.ctx with the provided ctx; alternatively implement a
short-circuit in handleUpgrade to return a clear error if e.ctx is not yet
initialized—but preferred is setting e.ctx = context.Background() in the
constructor and overwriting it in Start.
- Around line 498-525: closeConnection currently closes conn.inbound and
conn.outbound while writers (readLoop and the Kafka handler) may still be
blocked or in-flight, leading to send-on-closed-channel panics; modify
closeConnection and the connection lifecycle to wait for all sender goroutines
to exit before closing channels: introduce a sync.WaitGroup or sender counter on
brokerApiConnection (used by readLoop and the Kafka handler to
increment/decrement), ensure readLoop and the Kafka outbound sender decrement
the wg when they finish (or return) and that closeConnection calls
conn.cancel(), waits for that wg to complete (or another per-connection done
signal) and only then closes conn.inbound and conn.outbound and removes the
connection; keep references to conn.cancel, kafkaConsumer.Stop, readLoop and the
Kafka handler names to find the spots to add wg.Add/Done and the wait.

In `@event-gateway/WEBBROKERAPI.md`:
- Around line 1022-1025: Update the fenced code blocks in WEBBROKERAPI.md to
include a language identifier (e.g., use ```text) so markdownlint MD040 is
satisfied; locate the plain fenced blocks that contain outputs such as
"Connected (press CTRL+C to quit)" and the other listed snippets (lines around
the occurrences you noted) and prepend the fence with an appropriate
language/token (text) for each block.
- Around line 1377-1389: The channel examples (channel name "/default" with
on_produce and on_consume entries using the map-topic policy) are placed at the
top level of the channel but must be nested under the policies key to match the
documented schema; update the entries so on_produce and on_consume live under
policies (e.g., policies: { on_produce: [...], on_consume: [...] }) for the
map-topic v1 examples and apply the same change to the other occurrences
referenced (the blocks around lines 1419-1431 and 1464-1489).

---

Nitpick comments:
In
`@event-gateway/gateway-runtime/internal/connectors/receiver/websocket/broker_api_connector.go`:
- Around line 124-138: The Stop method on WebBrokerApiReceiver currently ignores
its ctx and closeConnection calls conn.kafkaConsumer.Stop with
context.Background(); change Stop to pass its ctx into closeConnection (e.g.,
closeConnection(ctx, conn)) and update closeConnection to accept a context
parameter and use that ctx when calling conn.kafkaConsumer.Stop(ctx) so the
caller's deadline/cancellation is honored; make the same ctx-threading change
for the other shutdown call sites referenced (the other closeConnection
invocations around the same connector).
- Around line 209-232: Guard against a nil processed result from
ProcessConnectionInitRequest: before accessing processed.Headers or
processed.Value in both the shortCircuited branch and the header-update branch,
check if processed == nil and handle it (e.g., log an error via slog or
processLogger and return a 500/appropriate response or skip header updates) to
avoid panics; update the shortCircuited block that reads
processed.Metadata/Headers/Value and the subsequent loop that updates r.Header
to first nil-check processed and perform a safe fallback path.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: ec30cb9d-acf2-4c57-aaf8-31dffdfab0bd

📥 Commits

Reviewing files that changed from the base of the PR and between fc0ea91 and 7e39a00.

📒 Files selected for processing (4)
  • event-gateway/ARCHITECTURE.md
  • event-gateway/WEBBROKERAPI.md
  • event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/consumer.go
  • event-gateway/gateway-runtime/internal/connectors/receiver/websocket/broker_api_connector.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/consumer.go

Comment thread event-gateway/ARCHITECTURE.md
Comment thread event-gateway/ARCHITECTURE.md Outdated
Comment on lines +85 to +98
cfg.Mux.HandleFunc(cfg.Channel.Context, e.handleUpgrade)

slog.Info("WebBrokerApi receiver registered HTTP handler",
"channel", cfg.Channel.Name,
"path", cfg.Channel.Context,
"mode", cfg.Channel.Mode,
"port", opts.Port)

return e, nil
}

// Start initializes the receiver.
func (e *WebBrokerApiReceiver) Start(ctx context.Context) error {
e.ctx = ctx
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot May 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Race between handler registration and Start: e.ctx may be nil when an upgrade arrives.

handleUpgrade is registered on the mux inside NewBrokerApiReceiver (line 85), but e.ctx is only assigned in Start (line 98). If a WebSocket request reaches the mux before Start runs, line 297 will call context.WithCancel(nil), which panics. Consider initializing e.ctx to context.Background() in the constructor and replacing it in Start, or rejecting requests with a clear error until Start has been called.

🛡️ Suggested guard in the constructor
 	e := &WebBrokerApiReceiver{
 		channel:      cfg.Channel,
 		processor:    cfg.Processor,
 		brokerDriver: cfg.BrokerDriver,
 		opts:         opts,
 		connections:  make(map[string]*brokerApiConnection),
+		ctx:          context.Background(),
 	}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@event-gateway/gateway-runtime/internal/connectors/receiver/websocket/broker_api_connector.go`
around lines 85 - 98, The handler registered by NewBrokerApiReceiver
(handleUpgrade) can run before Start assigns e.ctx, causing a panic when
handleUpgrade calls context.WithCancel(nil); fix by initializing the receiver's
context field (e.ctx) to context.Background() in the constructor
(NewBrokerApiReceiver) so handleUpgrade always has a non-nil context, and then
in Start replace e.ctx with the provided ctx; alternatively implement a
short-circuit in handleUpgrade to return a clear error if e.ctx is not yet
initialized—but preferred is setting e.ctx = context.Background() in the
constructor and overwriting it in Start.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will check and consider this as a future improvement

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@senthuran16, understood. Would you like me to open a GitHub issue to track this so it doesn't get lost? It would capture the race condition between handler registration in NewBrokerApiReceiver and context assignment in Start, with a reference back to this PR.


🧠 Learnings used
Learnt from: Krishanx92
Repo: wso2/api-platform PR: 1496
File: gateway/gateway-controller/pkg/policyxds/combined_cache.go:559-559
Timestamp: 2026-03-26T09:58:10.495Z
Learning: When reviewing Go code in this repo, usages of `cache.NewTestRawResponse` from `github.com/envoyproxy/go-control-plane/pkg/cache/v3` (Go-Control-Plane v0.14.0+) are valid for production use. Do not flag it as a missing/non-existent function or as “test-only”; the function is publicly exported with signature `func NewTestRawResponse(req *discovery.DiscoveryRequest, version string, resources []types.ResourceWithTTL) *RawResponse`.

Comment on lines +498 to +525
// closeConnection closes a connection and cleans up resources.
func (e *WebBrokerApiReceiver) closeConnection(conn *brokerApiConnection) {
conn.mu.Lock()
if conn.closed {
conn.mu.Unlock()
return
}
conn.closed = true
conn.mu.Unlock()

conn.cancel()

if conn.kafkaConsumer != nil {
if err := conn.kafkaConsumer.Stop(context.Background()); err != nil {
slog.Error("Failed to stop per-connection consumer", "connID", conn.connID, "error", err)
}
}

close(conn.inbound)
close(conn.outbound)
conn.ws.Close()

e.mu.Lock()
delete(e.connections, conn.connID)
e.mu.Unlock()

slog.Info("WebSocket connection closed", "connID", conn.connID, "channel", e.channel.Name)
}
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot May 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | 🏗️ Heavy lift

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify whether the Kafka consumer's Stop() is documented to drain in-flight callbacks
# before returning. The receiver's contract determines whether close(outbound) is safe.
fd -t f 'consumer.go' event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka -x cat {}
ast-grep --pattern $'func ($_ *$_) Stop($_ $_) error {
  $$$
}'
rg -nP --type=go -C3 '\bSubscribe\s*\(' event-gateway/gateway-runtime/internal/connectors

Repository: wso2/api-platform

Length of output: 18784


🏁 Script executed:

cat -n event-gateway/gateway-runtime/internal/connectors/receiver/websocket/broker_api_connector.go | head -450 | tail -100

Repository: wso2/api-platform

Length of output: 3874


🏁 Script executed:

cat -n event-gateway/gateway-runtime/internal/connectors/receiver/websocket/broker_api_connector.go | sed -n '350,430p'

Repository: wso2/api-platform

Length of output: 3040


🏁 Script executed:

cat -n event-gateway/gateway-runtime/internal/connectors/receiver/websocket/broker_api_connector.go | sed -n '480,540p'

Repository: wso2/api-platform

Length of output: 1613


🏁 Script executed:

cat -n event-gateway/gateway-runtime/internal/connectors/receiver/websocket/broker_api_connector.go | sed -n '310,330p'

Repository: wso2/api-platform

Length of output: 841


Race condition: channels closed while senders are active.

closeConnection closes conn.inbound and conn.outbound (lines 516–517) with unguarded senders still potentially in flight:

  • readLoop is the sole writer to inbound. The context check at lines 360–362 is non-blocking; ReadMessage() at line 365 does not respond to context cancellation. After cancel() is called at line 508, readLoop may remain blocked in ReadMessage(). When it finally unblocks and attempts to send at line 391, the channel may already be closed, causing a panic.

  • The Kafka handler (lines 314–323) is the sole writer to outbound. Although conn.cancel() is called first at line 508 and kafkaConsumer.Stop() blocks until the consumer loop exits via wg.Wait(), the handler's context is the consumer loop's context—not the connection's context. A handler in-flight when the connection context is cancelled may still execute and attempt to send to an already-closed outbound channel, causing a panic.

Consider a synchronization pattern: wait for all senders to exit before closing channels. For example, track active sends with a counter or WaitGroup and verify all goroutines have exited before proceeding to close().

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@event-gateway/gateway-runtime/internal/connectors/receiver/websocket/broker_api_connector.go`
around lines 498 - 525, closeConnection currently closes conn.inbound and
conn.outbound while writers (readLoop and the Kafka handler) may still be
blocked or in-flight, leading to send-on-closed-channel panics; modify
closeConnection and the connection lifecycle to wait for all sender goroutines
to exit before closing channels: introduce a sync.WaitGroup or sender counter on
brokerApiConnection (used by readLoop and the Kafka handler to
increment/decrement), ensure readLoop and the Kafka outbound sender decrement
the wg when they finish (or return) and that closeConnection calls
conn.cancel(), waits for that wg to complete (or another per-connection done
signal) and only then closes conn.inbound and conn.outbound and removes the
connection; keep references to conn.cancel, kafkaConsumer.Stop, readLoop and the
Kafka handler names to find the spots to add wg.Add/Done and the wait.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will check on this and consider as a future improvement

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@senthuran16, understood. To ensure this is tracked, would you like me to open a GitHub issue to capture the race condition in closeConnection (unguarded channel closures while readLoop and the Kafka outbound handler may still be active)? That way it won't be lost before a follow-up PR.


🧠 Learnings used
Learnt from: Krishanx92
Repo: wso2/api-platform PR: 1496
File: gateway/gateway-controller/pkg/policyxds/combined_cache.go:559-559
Timestamp: 2026-03-26T09:58:10.495Z
Learning: When reviewing Go code in this repo, usages of `cache.NewTestRawResponse` from `github.com/envoyproxy/go-control-plane/pkg/cache/v3` (Go-Control-Plane v0.14.0+) are valid for production use. Do not flag it as a missing/non-existent function or as “test-only”; the function is publicly exported with signature `func NewTestRawResponse(req *discovery.DiscoveryRequest, version string, resources []types.ResourceWithTTL) *RawResponse`.

Comment thread event-gateway/WEBBROKERAPI.md Outdated
Comment thread event-gateway/WEBBROKERAPI.md Outdated
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@event-gateway/ARCHITECTURE.md`:
- Around line 450-455: The table of WebSub phases in ARCHITECTURE.md is missing
the Unsubscribe phase; update the Phase/Purpose/When Applied rows to either add
a fourth row named "Unsubscribe" (Purpose: Authentication/authorization and
cleanup; When Applied: During unsubscription request to POST /hub or when
subscriber revokes) or explicitly mark unsubscription as out-of-scope with a
note; ensure you reference the existing table context and the "Subscribe",
"Inbound", and "Outbound" phase names so the new row/note stays consistent with
the rest of the document.

In `@event-gateway/README.md`:
- Around line 248-287: The example request body uses the old "broker" and
"allChannels" keys which don't match the WebBrokerApi schema; update the sample
to use "broker-driver" instead of "broker", add a top-level "policies" array
(instead of "allChannels"), and move per-channel policy definitions under each
channel's "policies" key (e.g., in the "prices" channel include "policies": []
and remove "on_connection_init"/"on_produce"/"on_consume" at the top level),
ensuring the "broker-driver" object contains "name", "type", and "properties" as
shown so the payload matches the architecture/reference docs schema.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: fa51d0c5-7f3d-4d06-b886-e8304f43fe6c

📥 Commits

Reviewing files that changed from the base of the PR and between 7e39a00 and 8e0582d.

📒 Files selected for processing (4)
  • event-gateway/ARCHITECTURE.md
  • event-gateway/README.md
  • event-gateway/gateway-runtime/internal/connectors/receiver/websocket/broker_api_connector.go
  • event-gateway/gateway-runtime/internal/xdsclient/handler.go
🚧 Files skipped from review as they are similar to previous changes (2)
  • event-gateway/gateway-runtime/internal/connectors/receiver/websocket/broker_api_connector.go
  • event-gateway/gateway-runtime/internal/xdsclient/handler.go

Comment thread event-gateway/ARCHITECTURE.md
Comment thread event-gateway/README.md
tharindu1st
tharindu1st previously approved these changes May 13, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Task]: Protocol mediation

2 participants