Skip to content

Commit 2f49e42

Browse files
committed
fix spec
1 parent 749d450 commit 2f49e42

16 files changed

Lines changed: 533 additions & 366 deletions

File tree

event-gateway/README.md

Lines changed: 190 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,18 @@ The Event Gateway is a lightweight, extensible runtime for managing event-driven
2525

2626
- **WebSubApi** — Multi-channel pub/sub API. Publishers send events via a webhook receiver; subscribers register callbacks at a hub endpoint. Each channel maps to a Kafka topic.
2727
- **Protocol Mediation** — Bridges WebSocket clients to Kafka topics (1:1 passthrough).
28-
- **Policies** — Pluggable enforcement at three points: `subscribe` (hub requests), `inbound` (event ingress), `outbound` (event delivery).
28+
- **Policies** — Pluggable enforcement at four points per channel:
29+
30+
| Policy point | YAML key | Triggered when |
31+
|---|---|---|
32+
| `on_subscription` | `subscribe` | A client subscribes at the hub |
33+
| `on_unsubscription` | `unsubscribe` | A client unsubscribes at the hub |
34+
| `on_message_received` | `inbound` | An event is published via the webhook receiver |
35+
| `on_message_delivery` | `outbound` | An event is delivered to a subscriber callback |
36+
37+
Policies can be applied at two scopes:
38+
- **`policies`** — applied uniformly to every channel in the API (e.g., authentication)
39+
- **`channels.<name>`** — applied only to a specific named channel (e.g., RBAC per topic)
2940

3041
## Prerequisites
3142

@@ -103,7 +114,12 @@ Two Postman collections are provided in [`spec/postman/`](spec/postman/):
103114

104115
#### Step 1: Create a WebSub API (Control Plane collection)
105116

106-
Use the **"Create Repo Watcher"** request. This registers a WebSub API with three channels (`issues`, `pull-requests`, `commits`) via the gateway controller, which pushes the configuration to the event gateway over xDS.
117+
Use the **"Create Repo Watcher"** request. This registers a WebSub API with two channels (`issues`, `pull-requests`) via the gateway controller, which pushes the configuration to the event gateway over xDS.
118+
119+
The spec uses two policy scopes:
120+
121+
- **`policies`** — API-wide policies applied to every channel (e.g., authentication on every subscribe/unsubscribe)
122+
- **`channels`** — Per-channel policies applied only to the named channel (e.g., RBAC per topic)
107123

108124
```
109125
POST http://localhost:9090/api/management/v0.9/websub-apis
@@ -120,23 +136,39 @@ Content-Type: application/json
120136
"displayName": "repo-watcher",
121137
"version": "v1.0",
122138
"context": "/repos",
123-
"hub": {
124-
"channels": [
125-
{ "name": "issues" },
126-
{ "name": "pull-requests" },
127-
{ "name": "commits" }
128-
],
129-
"policies": [
130-
{
131-
"name": "basic-auth",
132-
"version": "v1",
133-
"params": {
134-
"username": "admin",
135-
"password": "admin"
136-
}
139+
140+
"policies": {
141+
"on_subscription": [],
142+
"on_unsubscription": [],
143+
"on_message_received": [],
144+
"on_message_delivery": []
145+
},
146+
147+
"channels": {
148+
"issues": {
149+
"policies": {
150+
"on_subscription": [
151+
{
152+
"name": "rbac",
153+
"version": "v1",
154+
"params": { "allowedRoles": ["admin", "issue-manager"] }
155+
}
156+
]
157+
}
158+
},
159+
"pull-requests": {
160+
"policies": {
161+
"on_subscription": [
162+
{
163+
"name": "rbac",
164+
"version": "v1",
165+
"params": { "allowedRoles": ["admin", "developer"] }
166+
}
167+
]
137168
}
138-
]
169+
}
139170
},
171+
140172
"deploymentState": "deployed"
141173
}
142174
}
@@ -148,14 +180,10 @@ Use the **"Subscribe"** request. This registers a callback URL to receive events
148180

149181
```
150182
POST http://localhost:8080/repos/v1.0/hub
151-
Authorization: Basic admin:admin
183+
X-API-Key: <your-api-key>
152184
Content-Type: application/x-www-form-urlencoded
153185
154-
hub.mode=subscribe
155-
hub.topic=issues
156-
hub.callback=http://wh-listener:8090/
157-
hub.secret=mysecret
158-
hub.lease_seconds=3600
186+
hub.mode=subscribe&hub.topic=issues&hub.callback=http://wh-listener:8090/&hub.secret=mysecret&hub.lease_seconds=3600
159187
```
160188

161189
#### Step 3: Publish an Event (WebSub collection)
@@ -165,11 +193,22 @@ Use the **"Ingress"** request. This publishes an event to the `issues` channel.
165193
```
166194
POST http://localhost:8080/repos/v1.0/webhook-receiver?topic=issues
167195
Content-Type: text/plain
196+
X-Hub-Signature-256: sha256=<hmac-of-body>
168197
169198
issue0
170199
```
171200

172-
#### Step 4: Verify Delivery
201+
#### Step 4: Unsubscribe from a Topic (WebSub collection)
202+
203+
```
204+
POST http://localhost:8080/repos/v1.0/hub
205+
X-API-Key: <your-api-key>
206+
Content-Type: application/x-www-form-urlencoded
207+
208+
hub.mode=unsubscribe&hub.topic=issues&hub.callback=http://wh-listener:8090/
209+
```
210+
211+
#### Step 5: Verify Delivery
173212

174213
Check the webhook listener logs to confirm the event was delivered:
175214

@@ -207,35 +246,83 @@ curl -X POST http://localhost:9090/api/management/v0.9/websub-apis \
207246
"displayName": "repo-watcher",
208247
"version": "v1.0",
209248
"context": "/repos",
210-
"hub": {
211-
"channels": [
212-
{ "name": "issues" },
213-
{ "name": "pull-requests" },
214-
{ "name": "commits" }
249+
"policies": {
250+
"on_subscription": [
251+
{ "name": "api-key-auth", "version": "v1", "params": { "in": "header", "name": "X-API-Key" } }
215252
],
216-
"policies": [{
217-
"name": "basic-auth", "version": "v1",
218-
"params": { "username": "admin", "password": "admin" }
219-
}]
253+
"on_unsubscription": [
254+
{ "name": "api-key-auth", "version": "v1", "params": { "in": "header", "name": "X-API-Key" } }
255+
],
256+
"on_message_received": [],
257+
"on_message_delivery": []
258+
},
259+
"channels": {
260+
"issues": {},
261+
"pull-requests": {},
262+
"commits": {}
220263
},
221264
"deploymentState": "deployed"
222265
}
223266
}'
224267

225268
# 2. Subscribe to the "issues" topic
226269
curl -X POST http://localhost:8080/repos/v1.0/hub \
227-
-u admin:admin \
270+
-H "X-API-Key: <your-api-key>" \
228271
-d "hub.mode=subscribe&hub.topic=issues&hub.callback=http://wh-listener:8090/&hub.secret=mysecret&hub.lease_seconds=3600"
229272

230273
# 3. Publish an event
231274
curl -X POST "http://localhost:8080/repos/v1.0/webhook-receiver?topic=issues" \
232275
-H "Content-Type: text/plain" \
233276
-d "issue0"
234277

235-
# 4. Check delivery
278+
# 4. Unsubscribe
279+
curl -X POST http://localhost:8080/repos/v1.0/hub \
280+
-H "X-API-Key: <your-api-key>" \
281+
-d "hub.mode=unsubscribe&hub.topic=issues&hub.callback=http://wh-listener:8090/"
282+
283+
# 5. Check delivery
236284
docker compose logs wh-listener
237285
```
238286

287+
## WebSubApi Spec Reference
288+
289+
The `WebSubApi` kind is configured via `policies` and `channels`. Both are optional; omitting a policy point leaves it open (no enforcement).
290+
291+
```json
292+
{
293+
"apiVersion": "gateway.api-platform.wso2.com/v1alpha1",
294+
"kind": "WebSubApi",
295+
"metadata": { "name": "my-api" },
296+
"spec": {
297+
"displayName": "My API",
298+
"version": "v1.0",
299+
"context": "/my-api",
300+
301+
"policies": {
302+
"on_subscription": [ /* policies applied to every subscribe request */ ],
303+
"on_unsubscription": [ /* policies applied to every unsubscribe request */ ],
304+
"on_message_received":[ /* policies applied to every inbound event */ ],
305+
"on_message_delivery":[ /* policies applied to every outbound delivery */ ]
306+
},
307+
308+
"channels": {
309+
"<channel-name>": {
310+
"policies": {
311+
"on_subscription": [ /* channel-specific subscribe policies */ ],
312+
"on_unsubscription": [ /* channel-specific unsubscribe policies */ ],
313+
"on_message_received":[ /* channel-specific inbound policies */ ],
314+
"on_message_delivery":[ /* channel-specific outbound policies */ ]
315+
}
316+
}
317+
},
318+
319+
"deploymentState": "deployed"
320+
}
321+
}
322+
```
323+
324+
**Policy execution order:** `policies` policies run first, followed by the matching `channels` entry for that channel. Each policy object requires `name` and `version`; `params` is policy-specific.
325+
239326
## Configuration
240327

241328
### Runtime Configuration (`config.toml`)
@@ -276,7 +363,7 @@ When `websub_tls_enabled=true`, the event gateway serves `https://` on `websub_p
276363

277364
When the control plane is disabled, channels are loaded statically from [`gateway-runtime/configs/channels.yaml`](gateway-runtime/configs/channels.yaml). Two binding kinds are supported:
278365

279-
**WebSubApi** — Multi-channel API:
366+
**WebSubApi** — Multi-channel API. The `policies` block at root level maps to `policies`; each channel's `policies` block maps to its `channels` entry:
280367

281368
```yaml
282369
channels:
@@ -286,20 +373,42 @@ channels:
286373
context: /repos
287374
channels:
288375
- name: issues
376+
policies:
377+
subscribe: # → on_subscription (channel-level)
378+
- name: rbac
379+
version: v1
380+
params:
381+
allowedRoles: ["admin", "issue-manager"]
382+
unsubscribe: [] # → on_unsubscription (channel-level)
383+
inbound: [] # → on_message_received (channel-level)
384+
outbound: [] # → on_message_delivery (channel-level)
289385
- name: pull-requests
386+
policies:
387+
subscribe:
388+
- name: rbac
389+
version: v1
390+
params:
391+
allowedRoles: ["admin", "developer"]
392+
unsubscribe: []
393+
inbound: []
394+
outbound: []
290395
receiver:
291396
type: websub
292397
broker-driver:
293398
type: kafka
294399
config:
295400
brokers: ["kafka:29092"]
296-
policies:
297-
subscribe:
298-
- name: basic-auth
401+
policies: # → policies
402+
subscribe: # → on_subscription
403+
- name: api-key-auth
299404
version: v1
300-
params: { username: "admin", password: "admin" }
301-
inbound: []
302-
outbound: []
405+
params: { in: header, name: X-API-Key }
406+
unsubscribe: # → on_unsubscription
407+
- name: api-key-auth
408+
version: v1
409+
params: { in: header, name: X-API-Key }
410+
inbound: [] # → on_message_received
411+
outbound: [] # → on_message_delivery
303412
```
304413
305414
**Flat binding** — Protocol mediation (WebSocket → Kafka):
@@ -320,10 +429,20 @@ channels:
320429
brokers: ["kafka:29092"]
321430
policies:
322431
subscribe: []
432+
unsubscribe: []
323433
inbound: []
324434
outbound: []
325435
```
326436
437+
The four `policies` keys map directly to the control-plane spec fields:
438+
439+
| `channels.yaml` key | Control plane field | Triggered when |
440+
|---|---|---|
441+
| `subscribe` | `on_subscription` | Client subscribes at the hub |
442+
| `unsubscribe` | `on_unsubscription` | Client unsubscribes at the hub |
443+
| `inbound` | `on_message_received` | Event published via webhook receiver |
444+
| `outbound` | `on_message_delivery` | Event delivered to subscriber callback |
445+
327446
## Building from Source
328447

329448
```bash
@@ -339,6 +458,34 @@ make test
339458
docker compose build event-gateway
340459
```
341460

461+
## Running Integration Tests
462+
463+
The integration tests exercise the full stack (controller → event gateway → Kafka → delivery).
464+
465+
### Prerequisites
466+
467+
Make sure all services are running:
468+
469+
```bash
470+
cd event-gateway && docker compose up -d
471+
```
472+
473+
### Run the Tests
474+
475+
```bash
476+
cd event-gateway/gateway-runtime
477+
make test
478+
```
479+
480+
The integration test suite:
481+
482+
1. Creates a `WebSubApi` with `policies` and `channels` via the control plane REST API.
483+
2. Verifies the xDS snapshot is pushed to the event gateway.
484+
3. Subscribes a callback URL to each channel — `on_subscription` policies are enforced.
485+
4. Publishes events and asserts delivery to the callback — `on_message_received` and `on_message_delivery` policies are enforced.
486+
5. Unsubscribes and asserts `on_unsubscription` policies are enforced and no further delivery occurs.
487+
6. Deletes the API and asserts cleanup.
488+
342489
## Useful Endpoints
343490

344491
| Endpoint | Port | Description |
@@ -376,7 +523,7 @@ event-gateway/
376523
│ ├── cmd/event-gateway/ # Entry point and plugin registration
377524
│ ├── configs/
378525
│ │ ├── config.toml # Runtime configuration
379-
│ │ └── channels.yaml # Static channel bindings
526+
│ │ └── channels.yaml # Static channel bindings (used when control plane is disabled)
380527
│ └── internal/
381528
│ ├── admin/ # Health/readiness endpoints
382529
│ ├── binding/ # Channel binding parser

event-gateway/docker-compose.dev.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ services:
3333
- APIP_GW_ROUTER_EVENT__GATEWAY_WEBSUB__HUB_PORT=8080
3434
- APIP_GW_ROUTER_EVENT__GATEWAY_WEBSUB__HUB__LISTENER__PORT=8080
3535
- APIP_GW_ROUTER_EVENT__GATEWAY_TIMEOUT__SECONDS=30
36-
- APIP_GW_GATEWAY_REGISTRATION_TOKEN=${GATEWAY_REGISTRATION_TOKEN:?set GATEWAY_REGISTRATION_TOKEN in event-gateway/.env}
37-
- APIP_GW_CONTROLPLANE_HOST=${GATEWAY_CONTROLPLANE_HOST:?set GATEWAY_CONTROLPLANE_HOST in event-gateway/.env}
36+
- APIP_GW_GATEWAY_REGISTRATION_TOKEN=${GATEWAY_REGISTRATION_TOKEN:-}
37+
- APIP_GW_CONTROLPLANE_HOST=${GATEWAY_CONTROLPLANE_HOST:-}
3838

3939
volumes:
4040
- controller-data:/app/data

gateway/gateway-controller/api/management-openapi.yaml

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3761,20 +3761,28 @@ components:
37613761
description: Custom virtual host/domain for sandbox traffic
37623762
pattern: '^[a-zA-Z0-9\.\-]+$'
37633763
example: sandbox-api.example.com
3764-
allChannelPolicies:
3764+
policies:
37653765
$ref: '#/components/schemas/WebSubAllChannelPolicies'
3766-
channelPolicies:
3766+
channels:
37673767
type: object
3768-
description: Per-channel policy configuration keyed by channel name. Each key is a channel name and defines policies applied only to that channel.
3768+
description: Per-channel configuration keyed by channel name. Each key is a channel name and defines policies applied only to that channel.
37693769
additionalProperties:
3770-
$ref: '#/components/schemas/WebSubChannelPolicies'
3770+
$ref: '#/components/schemas/WebSubChannel'
37713771
deploymentState:
37723772
type: string
37733773
description: Desired deployment state - 'deployed' (default) or 'undeployed'. When set to 'undeployed', the API is removed from router traffic but configuration, API keys, and policies are preserved for potential redeployment.
37743774
enum: [deployed, undeployed]
37753775
default: deployed
37763776
example: deployed
37773777

3778+
# WebSubChannel defines a single channel entry with its per-channel policies.
3779+
WebSubChannel:
3780+
type: object
3781+
description: A single channel definition with optional per-channel policy overrides.
3782+
properties:
3783+
policies:
3784+
$ref: '#/components/schemas/WebSubChannelPolicies'
3785+
37783786
# WebSubAllChannelPolicies defines policies applied to all channels for each event type.
37793787
WebSubAllChannelPolicies:
37803788
type: object

0 commit comments

Comments
 (0)