|
30 | 30 |
|
31 | 31 | # SSL Certificate Verification Configuration |
32 | 32 | # Set MCP_FETCH_SSL_VERIFY=false to disable SSL verification for internal/self-signed certificates |
33 | | -SSL_VERIFY = os.getenv("MCP_FETCH_SSL_VERIFY", "true").lower() == "true" |
| 33 | +# NOTE: Only explicit "false" disables verification; any other value (including typos) keeps it enabled. |
| 34 | +SSL_VERIFY = os.getenv("MCP_FETCH_SSL_VERIFY", "true").lower() != "false" |
34 | 35 |
|
35 | 36 | # SSRF Protection Configuration |
36 | 37 | # Set MCP_FETCH_ALLOW_PRIVATE_IPS=true to allow fetching from private/internal networks |
|
72 | 73 | ]) |
73 | 74 |
|
74 | 75 |
|
| 76 | +def _parse_obfuscated_ip(hostname: str) -> str | None: |
| 77 | + """ |
| 78 | + Detect and decode obfuscated IP address formats. |
| 79 | +
|
| 80 | + Attackers may use alternative IP representations to bypass SSRF filters: |
| 81 | + - Decimal: 2130706433 (= 127.0.0.1) |
| 82 | + - Octal: 0177.0.0.1 (= 127.0.0.1) |
| 83 | + - Hex: 0x7f000001 (= 127.0.0.1) |
| 84 | + - Mixed: 0x7f.0.0.1 (= 127.0.0.1) |
| 85 | +
|
| 86 | + Returns the normalized IP string if detected, None otherwise. |
| 87 | + """ |
| 88 | + hostname = hostname.strip() |
| 89 | + |
| 90 | + # Try decimal integer format (e.g., 2130706433 = 127.0.0.1) |
| 91 | + try: |
| 92 | + if hostname.isdigit(): |
| 93 | + ip_int = int(hostname) |
| 94 | + if 0 <= ip_int <= 0xFFFFFFFF: # Valid 32-bit range |
| 95 | + # Convert to dotted decimal |
| 96 | + return str(ipaddress.IPv4Address(ip_int)) |
| 97 | + except (ValueError, ipaddress.AddressValueError): |
| 98 | + pass |
| 99 | + |
| 100 | + # Try hex format (e.g., 0x7f000001 = 127.0.0.1) |
| 101 | + try: |
| 102 | + if hostname.lower().startswith("0x") and "." not in hostname: |
| 103 | + ip_int = int(hostname, 16) |
| 104 | + if 0 <= ip_int <= 0xFFFFFFFF: |
| 105 | + return str(ipaddress.IPv4Address(ip_int)) |
| 106 | + except (ValueError, ipaddress.AddressValueError): |
| 107 | + pass |
| 108 | + |
| 109 | + # Try octal/hex dotted format (e.g., 0177.0.0.1 or 0x7f.0.0.1) |
| 110 | + # Only return if there's actual obfuscation (hex prefix or leading zeros) |
| 111 | + if "." in hostname: |
| 112 | + parts = hostname.split(".") |
| 113 | + if len(parts) == 4: |
| 114 | + try: |
| 115 | + octets = [] |
| 116 | + has_obfuscation = False |
| 117 | + for part in parts: |
| 118 | + part = part.strip() |
| 119 | + if part.lower().startswith("0x"): |
| 120 | + octets.append(int(part, 16)) |
| 121 | + has_obfuscation = True |
| 122 | + elif part.startswith("0") and len(part) > 1 and part.isdigit(): |
| 123 | + # Octal format (leading zero with more digits) |
| 124 | + octets.append(int(part, 8)) |
| 125 | + has_obfuscation = True |
| 126 | + else: |
| 127 | + octets.append(int(part)) |
| 128 | + |
| 129 | + # Only return if we detected obfuscation AND result is valid |
| 130 | + if has_obfuscation and all(0 <= o <= 255 for o in octets): |
| 131 | + return f"{octets[0]}.{octets[1]}.{octets[2]}.{octets[3]}" |
| 132 | + except ValueError: |
| 133 | + pass |
| 134 | + |
| 135 | + return None |
| 136 | + |
| 137 | + |
75 | 138 | def _is_ip_private_or_reserved(ip_str: str) -> bool: |
76 | 139 | """ |
77 | 140 | Check if an IP address is private, reserved, loopback, or link-local. |
@@ -202,9 +265,22 @@ def validate_url_for_ssrf(url: str) -> None: |
202 | 265 | f"This hostname is associated with internal services.", |
203 | 266 | )) |
204 | 267 |
|
205 | | - # Try to parse hostname as IP address directly (handles obfuscation) |
| 268 | + # Check for obfuscated IP addresses (decimal, octal, hex encoding) |
| 269 | + # Python's ipaddress module does NOT parse these from strings, so we handle them explicitly |
| 270 | + obfuscated_ip = _parse_obfuscated_ip(hostname) |
| 271 | + if obfuscated_ip: |
| 272 | + if _is_ip_private_or_reserved(obfuscated_ip): |
| 273 | + if not ALLOW_PRIVATE_IPS: |
| 274 | + raise McpError(ErrorData( |
| 275 | + code=INVALID_PARAMS, |
| 276 | + message=f"Access to obfuscated private IP address '{hostname}' " |
| 277 | + f"(decoded: {obfuscated_ip}) is blocked. " |
| 278 | + f"Set MCP_FETCH_ALLOW_PRIVATE_IPS=true to allow internal network access.", |
| 279 | + )) |
| 280 | + return |
| 281 | + |
| 282 | + # Try to parse hostname as standard IP address |
206 | 283 | try: |
207 | | - # This handles decimal (2130706433), octal (0177.0.0.1), hex (0x7f.0.0.1) |
208 | 284 | ip = ipaddress.ip_address(hostname) |
209 | 285 | if _is_ip_private_or_reserved(str(ip)): |
210 | 286 | if not ALLOW_PRIVATE_IPS: |
@@ -305,7 +381,7 @@ async def check_may_autonomously_fetch_url(url: str, user_agent: str, proxy_url: |
305 | 381 | try: |
306 | 382 | response = await client.get( |
307 | 383 | robot_txt_url, |
308 | | - follow_redirects=True, |
| 384 | + follow_redirects=False, |
309 | 385 | headers={"User-Agent": user_agent}, |
310 | 386 | timeout=30, |
311 | 387 | ) |
@@ -383,7 +459,7 @@ async def fetch_url( |
383 | 459 | try: |
384 | 460 | response = await client.get( |
385 | 461 | url, |
386 | | - follow_redirects=True, |
| 462 | + follow_redirects=False, |
387 | 463 | headers={"User-Agent": user_agent}, |
388 | 464 | timeout=30, |
389 | 465 | ) |
|
0 commit comments