diff --git a/app/models/webhook.py b/app/models/webhook.py index 6ffcf40..66a4179 100644 --- a/app/models/webhook.py +++ b/app/models/webhook.py @@ -1,3 +1,5 @@ +from ipaddress import AddressValueError, IPv4Address, IPv6Address +from urllib.parse import urlparse """Webhook model for event notifications.""" import uuid @@ -9,6 +11,35 @@ from app.models.base import TimestampMixin, new_uuid +def validate_webhook_url(url: str) -> str: + """Validate webhook URL to prevent SSRF attacks.""" + try: + parsed = urlparse(url) + + # Only allow http/https schemes + if parsed.scheme not in ('http', 'https'): + raise ValueError("Only http and https schemes are allowed") + + # Block internal/private IP addresses + if parsed.hostname: + try: + ip = IPv4Address(parsed.hostname) + if ip.is_private or ip.is_loopback or ip.is_link_local: + raise ValueError("Private, loopback, and link-local addresses are not allowed") + except AddressValueError: + try: + ip = IPv6Address(parsed.hostname) + if ip.is_private or ip.is_loopback or ip.is_link_local: + raise ValueError("Private, loopback, and link-local addresses are not allowed") + except AddressValueError: + # Not an IP address, allow domain names + pass + + return url + except Exception as e: + raise ValueError(f"Invalid webhook URL: {e}") + + class WebhookEvent(StrEnum): SOURCE_INGESTED = "source.ingested" @@ -23,7 +54,7 @@ class Webhook(TimestampMixin, SQLModel, table=True): tenant_id: uuid.UUID = Field(foreign_key="tenants.id", nullable=False, index=True) url: str = Field(max_length=2048) secret: str = Field(max_length=256) # for HMAC signing - events: str = Field(sa_column=Column(Text, nullable=False)) # JSON array of event types + url: str = Field(max_length=2048, json_schema_extra={"validator": validate_webhook_url}) is_active: bool = Field(default=True) description: str = Field(default="", max_length=500) @@ -34,7 +65,7 @@ class Webhook(TimestampMixin, SQLModel, table=True): class WebhookCreate(SQLModel): url: str = Field(max_length=2048) events: list[str] - description: str = Field(default="", max_length=500) + url: str = Field(max_length=2048, json_schema_extra={"validator": validate_webhook_url}) secret: str | None = None