|
3 | 3 | # See the LICENSE file for details. |
4 | 4 |
|
5 | 5 | # Python imports |
| 6 | +import logging |
| 7 | +import os |
6 | 8 | import socket |
7 | 9 | import ipaddress |
8 | 10 | from urllib.parse import urlparse |
9 | 11 |
|
10 | 12 | # Third party imports |
11 | 13 | from rest_framework import serializers |
12 | 14 |
|
| 15 | +# Allow private/loopback webhook URLs for self-hosted instances. |
| 16 | +# When enabled, SSRF protection (private/loopback/reserved/link-local IP |
| 17 | +# blocking) is bypassed for webhook URLs. Only enable this in trusted |
| 18 | +# self-hosted environments where webhooks target local services. |
| 19 | +ALLOW_PRIVATE_WEBHOOKS = os.environ.get("PLANE_ALLOW_PRIVATE_WEBHOOKS", "0").lower() in ("1", "true", "yes") |
| 20 | + |
| 21 | +if ALLOW_PRIVATE_WEBHOOKS: |
| 22 | + logging.getLogger(__name__).warning( |
| 23 | + "PLANE_ALLOW_PRIVATE_WEBHOOKS is enabled — webhooks can target " |
| 24 | + "private/internal IPs. Only enable this in trusted self-hosted " |
| 25 | + "environments." |
| 26 | + ) |
| 27 | + |
13 | 28 | # Module imports |
14 | 29 | from .base import DynamicBaseSerializer |
15 | 30 | from plane.db.models import Webhook, WebhookLog |
16 | 31 | from plane.db.models.webhook import validate_domain, validate_schema |
17 | 32 |
|
18 | 33 |
|
19 | | -class WebhookSerializer(DynamicBaseSerializer): |
20 | | - url = serializers.URLField(validators=[validate_schema, validate_domain]) |
| 34 | +def _validate_webhook_url(url, request): |
| 35 | + """Validate a webhook URL: resolve hostname, check IP restrictions, and |
| 36 | + block disallowed domains. |
21 | 37 |
|
22 | | - def create(self, validated_data): |
23 | | - url = validated_data.get("url", None) |
| 38 | + Raises ``serializers.ValidationError`` on failure. |
| 39 | + """ |
| 40 | + hostname = urlparse(url).hostname |
| 41 | + if not hostname: |
| 42 | + raise serializers.ValidationError({"url": "Invalid URL: No hostname found."}) |
24 | 43 |
|
25 | | - # Extract the hostname from the URL |
26 | | - hostname = urlparse(url).hostname |
27 | | - if not hostname: |
28 | | - raise serializers.ValidationError({"url": "Invalid URL: No hostname found."}) |
| 44 | + # Normalize hostname: strip trailing dot (FQDN) and lowercase to |
| 45 | + # prevent bypass via canonical variants like "plane.so." or "Plane.SO". |
| 46 | + hostname = hostname.rstrip(".").lower() |
29 | 47 |
|
30 | | - # Resolve the hostname to IP addresses |
31 | | - try: |
32 | | - ip_addresses = socket.getaddrinfo(hostname, None) |
33 | | - except socket.gaierror: |
34 | | - raise serializers.ValidationError({"url": "Hostname could not be resolved."}) |
| 48 | + # Resolve the hostname to IP addresses |
| 49 | + try: |
| 50 | + ip_addresses = socket.getaddrinfo(hostname, None) |
| 51 | + except socket.gaierror: |
| 52 | + raise serializers.ValidationError({"url": "Hostname could not be resolved."}) |
35 | 53 |
|
36 | | - if not ip_addresses: |
37 | | - raise serializers.ValidationError({"url": "No IP addresses found for the hostname."}) |
| 54 | + if not ip_addresses: |
| 55 | + raise serializers.ValidationError({"url": "No IP addresses found for the hostname."}) |
38 | 56 |
|
| 57 | + # Block private/loopback/reserved/link-local IPs unless explicitly allowed |
| 58 | + if not ALLOW_PRIVATE_WEBHOOKS: |
39 | 59 | for addr in ip_addresses: |
40 | 60 | ip = ipaddress.ip_address(addr[4][0]) |
41 | 61 | if ip.is_private or ip.is_loopback or ip.is_reserved or ip.is_link_local: |
42 | 62 | raise serializers.ValidationError({"url": "URL resolves to a blocked IP address."}) |
43 | 63 |
|
44 | | - # Additional validation for multiple request domains and their subdomains |
45 | | - request = self.context.get("request") |
46 | | - disallowed_domains = ["plane.so"] # Add your disallowed domains here |
47 | | - if request: |
48 | | - request_host = request.get_host().split(":")[0] # Remove port if present |
49 | | - disallowed_domains.append(request_host) |
| 64 | + # Additional validation for disallowed domains and their subdomains |
| 65 | + disallowed_domains = ["plane.so"] |
| 66 | + if request: |
| 67 | + request_host = request.get_host().split(":")[0].rstrip(".").lower() |
| 68 | + disallowed_domains.append(request_host) |
| 69 | + |
| 70 | + if any(hostname == domain or hostname.endswith("." + domain) for domain in disallowed_domains): |
| 71 | + raise serializers.ValidationError({"url": "URL domain or its subdomain is not allowed."}) |
| 72 | + |
50 | 73 |
|
51 | | - # Check if hostname is a subdomain or exact match of any disallowed domain |
52 | | - if any(hostname == domain or hostname.endswith("." + domain) for domain in disallowed_domains): |
53 | | - raise serializers.ValidationError({"url": "URL domain or its subdomain is not allowed."}) |
| 74 | +def _validate_domain_for_webhook(value): |
| 75 | + """Conditionally apply domain validation. |
54 | 76 |
|
| 77 | + When ALLOW_PRIVATE_WEBHOOKS is enabled, skip the domain validator so |
| 78 | + that loopback hosts like localhost / 127.0.0.1 are not rejected at the |
| 79 | + field level. The full URL validation in _validate_webhook_url() still |
| 80 | + enforces disallowed-domain blocking. |
| 81 | + """ |
| 82 | + if ALLOW_PRIVATE_WEBHOOKS: |
| 83 | + return |
| 84 | + validate_domain(value) |
| 85 | + |
| 86 | + |
| 87 | +class WebhookSerializer(DynamicBaseSerializer): |
| 88 | + url = serializers.URLField(validators=[validate_schema, _validate_domain_for_webhook]) |
| 89 | + |
| 90 | + def create(self, validated_data): |
| 91 | + url = validated_data.get("url", None) |
| 92 | + _validate_webhook_url(url, self.context.get("request")) |
55 | 93 | return Webhook.objects.create(**validated_data) |
56 | 94 |
|
57 | 95 | def update(self, instance, validated_data): |
58 | 96 | url = validated_data.get("url", None) |
59 | 97 | if url: |
60 | | - # Extract the hostname from the URL |
61 | | - hostname = urlparse(url).hostname |
62 | | - if not hostname: |
63 | | - raise serializers.ValidationError({"url": "Invalid URL: No hostname found."}) |
64 | | - |
65 | | - # Resolve the hostname to IP addresses |
66 | | - try: |
67 | | - ip_addresses = socket.getaddrinfo(hostname, None) |
68 | | - except socket.gaierror: |
69 | | - raise serializers.ValidationError({"url": "Hostname could not be resolved."}) |
70 | | - |
71 | | - if not ip_addresses: |
72 | | - raise serializers.ValidationError({"url": "No IP addresses found for the hostname."}) |
73 | | - |
74 | | - for addr in ip_addresses: |
75 | | - ip = ipaddress.ip_address(addr[4][0]) |
76 | | - if ip.is_private or ip.is_loopback or ip.is_reserved or ip.is_link_local: |
77 | | - raise serializers.ValidationError({"url": "URL resolves to a blocked IP address."}) |
78 | | - |
79 | | - # Additional validation for multiple request domains and their subdomains |
80 | | - request = self.context.get("request") |
81 | | - disallowed_domains = ["plane.so"] # Add your disallowed domains here |
82 | | - if request: |
83 | | - request_host = request.get_host().split(":")[0] # Remove port if present |
84 | | - disallowed_domains.append(request_host) |
85 | | - |
86 | | - # Check if hostname is a subdomain or exact match of any disallowed domain |
87 | | - if any(hostname == domain or hostname.endswith("." + domain) for domain in disallowed_domains): |
88 | | - raise serializers.ValidationError({"url": "URL domain or its subdomain is not allowed."}) |
89 | | - |
| 98 | + _validate_webhook_url(url, self.context.get("request")) |
90 | 99 | return super().update(instance, validated_data) |
91 | 100 |
|
92 | 101 | class Meta: |
|
0 commit comments