Skip to content

Commit 955c525

Browse files
committed
Add code for webhook dispatcher
A dispatcher is a new component to manage incoming webhooks from Github. It accept new requests and manage a queue based on cluster capacity. A capacity is defined as a number of running pipelines in certain namespace. This simple metrics will provide a buffer mechanism for time periods when we receive a bunk releases of operators. The mechanism keeps pending requests in the database queue and only trigger related pipeline in case of free capacity. The solution is made of: - Rest API - Postgres database - Dispatcher - Capacity manager JIRA: ISV-6108 Signed-off-by: Ales Raszka <araszka@redhat.com>
1 parent cc39c38 commit 955c525

25 files changed

Lines changed: 2868 additions & 1121 deletions

docker-compose.yml

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
---
2+
services:
3+
db:
4+
image: registry.redhat.io/rhel10/postgresql-16@sha256:8515941a42d6bc5d59b753839174b3c0e839315d686ca2fdd6fa65e1cccf3390
5+
container_name: webhook-dispatcher-db
6+
restart: unless-stopped
7+
environment:
8+
POSTGRESQL_USER: user
9+
POSTGRESQL_PASSWORD: password
10+
POSTGRESQL_DATABASE: webhook_dispatcher
11+
ports:
12+
- "5432:5432"
13+
volumes:
14+
- pgdata:/var/lib/postgresql/data
15+
healthcheck:
16+
test: ["CMD-SHELL", "pg_isready -U ${DB_USER} -d ${DB_NAME}"]
17+
interval: 10s
18+
timeout: 5s
19+
retries: 5
20+
21+
volumes:
22+
pgdata:

docs/webhook-dispatecher.md

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
# Webhook Dispatcher
2+
3+
The Webhook Dispatcher is a microservice that receives GitHub webhooks and processes them for operator pipelines.
4+
It validates webhooks, stores events in a database, and triggers Tekton pipelines for operator pipelines with
5+
a capacity management system.
6+
7+
## Overview
8+
9+
The Webhook Dispatcher serves as the entry point for GitHub events in the operator pipelines.
10+
When developers submit operators via pull requests, the dispatcher:
11+
12+
1. **Receives and validates** GitHub webhooks with signature verification
13+
2. **Stores webhook events** in a PostgreSQL database
14+
3. **Manages pipeline capacity** to prevent resource exhaustion
15+
4. **Triggers Tekton pipelines** for operator validation
16+
17+
## Architecture
18+
19+
```
20+
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
21+
│ GitHub │ │ Webhook │ │ PostgreSQL │
22+
│ Webhooks │───▶│ Dispatcher │───▶│ Database │
23+
└─────────────────┘ └──────────────────┘ └─────────────────┘
24+
25+
26+
┌──────────────────┐
27+
│ Tekton │
28+
│ Pipelines │
29+
└──────────────────┘
30+
```
31+
32+
## Features
33+
34+
- **Secure webhook processing** with HMAC-SHA256 signature verification
35+
- **Multi-repository support** with configurable processing rules
36+
- **Pipeline capacity management** to prevent resource overload
37+
- **Background event processing** for improved performance
38+
- **Health monitoring** and status tracking
39+
40+
41+
## Installation
42+
43+
### Prerequisites
44+
- Python 3.11+
45+
- PostgreSQL 12+
46+
- Kubernetes/OpenShift cluster access
47+
48+
### Setup
49+
50+
1. Install dependencies:
51+
```bash
52+
pdm install --no-dev
53+
```
54+
55+
2. Create database:
56+
```bash
57+
docker compose up -d
58+
```
59+
60+
3. Set environment variables:
61+
```bash
62+
export DATABASE_URL="postgresql://webhook_user:secure_password@localhost:5432/webhook_dispatcher"
63+
export GITHUB_WEBHOOK_SECRET="your_github_webhook_secret"
64+
```
65+
66+
## Configuration
67+
68+
Create a `dispatcher_config.yaml` file:
69+
70+
```yaml
71+
---
72+
dispatcher:
73+
items:
74+
- name: "Community Operators"
75+
events:
76+
- opened
77+
- synchronize
78+
- ready_for_review
79+
full_repository_name: "redhat-openshift-ecosystem/community-operators"
80+
capacity:
81+
type: ocp_tekton
82+
pipeline_name: "operator-hosted-pipeline"
83+
max_capacity: 5
84+
namespace: "operator-pipeline-prod"
85+
# Tekton pipeline listener URL
86+
callback_url: "https://tekton-dashboard.example.com/api/v1/trigger"
87+
88+
security:
89+
github_webhook_secret: "${GITHUB_WEBHOOK_SECRET}"
90+
verify_signatures: true
91+
allowed_github_events:
92+
- pull_request
93+
```
94+
95+
### Configuration Parameters
96+
97+
- **name**: Configuration identifier
98+
- **events**: GitHub PR events to process
99+
- **full_repository_name**: Repository in `owner/repo` format
100+
- **capacity**: Pipeline capacity settings
101+
- **callback_url**: Tekton pipeline trigger URL
102+
- **github_webhook_secret**: Webhook signature verification secret
103+
104+
## Usage
105+
106+
### Running the Service
107+
108+
Production mode:
109+
```bash
110+
export CONFIG_FILE="/path/to/dispatcher_config.yaml"
111+
python -m operatorcert.webhook_dispatcher.main
112+
```
113+
114+
### GitHub Webhook Setup
115+
116+
1. Go to repository **Settings** → **Webhooks**
117+
2. Add webhook with:
118+
- **URL**: `https://your-domain.com/api/v1/webhooks/github-pipeline`
119+
- **Content type**: `application/json`
120+
- **Secret**: Your webhook secret
121+
- **Events**: Pull requests
122+
123+
## API Endpoints
124+
125+
- `POST /api/v1/webhooks/github-pipeline` - Receive GitHub webhooks
126+
- `GET /api/v1/status/ping` - Health check
127+
- `GET /api/v1/status/db` - Database health
128+
- `GET /api/v1/events/status` - Event status with pagination
129+
130+
## Security
131+
132+
- **HMAC-SHA256 signature verification** for all webhook requests
133+
- **GitHub User-Agent validation** to prevent spoofing
134+
- **Event type filtering** based on configuration
135+
- **TLS encryption** required for production deployments
136+
137+
## Monitoring
138+
139+
### Health Checks
140+
```bash
141+
curl http://localhost:5000/api/v1/status/ping
142+
curl http://localhost:5000/api/v1/status/db
143+
```
144+
145+
### Event Status
146+
```bash
147+
curl "http://localhost:5000/api/v1/events/status?page_size=20"
148+
```
149+
150+
## Troubleshooting
151+
152+
### Common Issues
153+
154+
**Signature verification failures**:
155+
- Verify `GITHUB_WEBHOOK_SECRET` matches GitHub configuration
156+
- Ensure webhook uses `application/json` content type
157+
158+
**Database connection errors**:
159+
- Check `DATABASE_URL` format
160+
- Verify PostgreSQL is running and accessible
161+
162+
**Event processing delays**:
163+
- Check dispatcher thread logs
164+
- Monitor pipeline capacity usage
165+
- Review database performance
166+
167+
### Debug Mode
168+
```bash
169+
export LOG_LEVEL=DEBUG
170+
python -m operatorcert.webhook_dispatcher.main --verbose
171+
```
172+
173+
For support, refer to the [operator-pipelines repository](https://github.com/redhat-openshift-ecosystem/operator-pipelines).
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
---
2+
dispatcher:
3+
items:
4+
- name: Red Hat community operators preprod
5+
events:
6+
- labeled
7+
- opened
8+
- reopened
9+
- synchronize
10+
- ready_for_review
11+
- closed
12+
full_repository_name: "redhat-openshift-ecosystem/community-operators-preprod"
13+
capacity:
14+
type: ocp_tekton
15+
pipeline_name: "operator-hosted-pipeline"
16+
max_capacity: 2
17+
namespace: "operator-pipeline-stage"
18+
callback_url: "https://example.com/foo" # Placeholder URL
19+
20+
security:
21+
# github_webhook_secret: "" # This can be also set by `GITHUB_WEBHOOK_SECRET`
22+
verify_signatures: true
23+
allowed_github_events:
24+
- pull_request

operator-pipeline-images/operatorcert/webhook_dispatcher/__init__.py

Whitespace-only changes.
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
"""
2+
Webhook API for handling GitHub pipeline events.
3+
"""
4+
5+
import logging
6+
import os
7+
from typing import Any
8+
9+
import flask
10+
from flask import jsonify, request
11+
from operatorcert.webhook_dispatcher.config import load_config
12+
from operatorcert.webhook_dispatcher.database import get_database, get_db_session
13+
from operatorcert.webhook_dispatcher.models import (
14+
WebhookEvent,
15+
convert_to_webhook_event,
16+
)
17+
from operatorcert.webhook_dispatcher.security import validate_github_webhook
18+
19+
_CONFIG = None
20+
21+
LOGGER = logging.getLogger("operator-cert")
22+
app = flask.Flask(__name__)
23+
24+
25+
def get_config() -> Any:
26+
"""
27+
Load and return the configuration.
28+
If the configuration is already loaded, return the cached version.
29+
"""
30+
# singleton pattern for configuration loading
31+
global _CONFIG # pylint: disable=global-statement
32+
if _CONFIG is None:
33+
_CONFIG = load_config(
34+
os.getenv("WEBHOOK_DISPATCHER_CONFIG", "dispatcher_config.json")
35+
)
36+
return _CONFIG
37+
38+
39+
def event_to_dict(event: WebhookEvent) -> dict[str, Any]:
40+
"""
41+
Convert a WebhookEvent object to a dictionary.
42+
"""
43+
return {
44+
"id": event.id,
45+
"delivery_id": event.delivery_id,
46+
"repository_full_name": event.repository_full_name,
47+
"pull_request_number": event.pull_request_number,
48+
"processed": event.processed,
49+
"processing_error": event.processing_error,
50+
"received_at": event.received_at.isoformat(),
51+
"processed_at": event.processed_at.isoformat() if event.processed_at else None,
52+
"status": event.status,
53+
}
54+
55+
56+
@app.route("/api/v1/webhooks/github-pipeline", methods=["POST"])
57+
def github_pipeline_webhook() -> Any:
58+
"""
59+
Receive GitHub webhook events for pipeline execution.
60+
61+
Returns:
62+
Any: A JSON response indicating the status of the event processing.
63+
"""
64+
payload = request.get_json()
65+
config = get_config()
66+
result = validate_github_webhook(request, config.security)
67+
if isinstance(result, tuple):
68+
LOGGER.debug("GitHub webhook validation failed: %s", result)
69+
return jsonify(result[0]), result[1]
70+
LOGGER.info("GitHub webhook validation result: %s", result)
71+
webhook_event = convert_to_webhook_event(payload, request)
72+
if webhook_event is None:
73+
return jsonify({"status": "rejected", "message": "Unsupported event"}), 400
74+
75+
db_session = next(get_db_session())
76+
db_session.add(webhook_event)
77+
db_session.commit()
78+
79+
return (
80+
jsonify(
81+
{
82+
"status": "ok",
83+
"message": "Event received",
84+
**event_to_dict(webhook_event),
85+
}
86+
),
87+
200,
88+
)
89+
90+
91+
@app.route("/api/v1/events/status", methods=["GET"])
92+
def events_status() -> Any:
93+
"""
94+
Get a summary of webhook events.
95+
96+
Returns:
97+
Any: A JSON response containing the status of webhook events.
98+
"""
99+
page = request.args.get("page", 1, type=int)
100+
page_size = request.args.get("page_size", 10, type=int)
101+
request_filter = request.args.get("filter", "")
102+
103+
db_filter = []
104+
105+
for item in request_filter.split(";"):
106+
if "=" in item:
107+
key, val = item.split("=")
108+
db_filter.append(getattr(WebhookEvent, key) == val)
109+
110+
db_session = next(get_db_session())
111+
112+
events = (
113+
db_session.query(WebhookEvent)
114+
.filter(*db_filter)
115+
.order_by(WebhookEvent.received_at.desc())
116+
.offset((page - 1) * page_size)
117+
.limit(page_size)
118+
.all()
119+
)
120+
total_count = db_session.query(WebhookEvent).filter(*db_filter).count()
121+
122+
output = []
123+
for event in events:
124+
output.append(event_to_dict(event))
125+
126+
return (
127+
jsonify(
128+
{
129+
"status": "ok",
130+
"events": output,
131+
"page": page,
132+
"page_size": page_size,
133+
"total_count": total_count,
134+
}
135+
),
136+
200,
137+
)
138+
139+
140+
@app.route("/api/v1/status/ping", methods=["GET"])
141+
def get_ping() -> Any:
142+
"""
143+
Health check endpoint to verify the API is running.
144+
145+
Returns:
146+
Any: A JSON response indicating the API status.
147+
"""
148+
return jsonify({"status": "pong"}), 200
149+
150+
151+
@app.route("/api/v1/status/db", methods=["GET"])
152+
def get_db_status() -> Any:
153+
"""
154+
Health check endpoint to verify the database is running.
155+
156+
Returns:
157+
Any: A JSON response indicating the API status.
158+
"""
159+
db_manager = get_database()
160+
if db_manager.health_check():
161+
return jsonify({"status": "ok", "message": "Database is running"}), 200
162+
return (
163+
jsonify({"status": "error", "message": "Database health check failed"}),
164+
500,
165+
)

0 commit comments

Comments
 (0)