|
| 1 | +import socket |
| 2 | +from unittest.mock import patch |
| 3 | + |
| 4 | +import requests |
| 5 | + |
| 6 | +from dojo.utils_ssrf import SSRFError, _SSRFSafeAdapter, make_ssrf_safe_session, validate_url_for_ssrf # noqa: PLC2701 |
| 7 | +from unittests.dojo_test_case import DojoTestCase |
| 8 | + |
| 9 | + |
| 10 | +def _addr_info(ip, port=80): |
| 11 | + """Build a minimal getaddrinfo-style return value for a single IP.""" |
| 12 | + return [(socket.AF_INET, socket.SOCK_STREAM, 6, "", (ip, port))] |
| 13 | + |
| 14 | + |
| 15 | +_MIXED_ADDR_INFO = [ |
| 16 | + (socket.AF_INET, socket.SOCK_STREAM, 6, "", ("8.8.8.8", 80)), |
| 17 | + (socket.AF_INET, socket.SOCK_STREAM, 6, "", ("192.168.1.1", 80)), |
| 18 | +] |
| 19 | + |
| 20 | + |
| 21 | +class TestValidateUrlForSsrf(DojoTestCase): |
| 22 | + |
| 23 | + @patch("dojo.utils_ssrf.socket.getaddrinfo", return_value=_addr_info("8.8.8.8")) |
| 24 | + def test_valid_public_url_does_not_raise(self, mock_getaddrinfo): |
| 25 | + validate_url_for_ssrf("http://example.com/api") # should not raise |
| 26 | + |
| 27 | + def test_file_scheme_raises(self): |
| 28 | + with self.assertRaisesRegex(SSRFError, "not permitted"): |
| 29 | + validate_url_for_ssrf("file:///etc/passwd") |
| 30 | + |
| 31 | + def test_gopher_scheme_raises(self): |
| 32 | + with self.assertRaisesRegex(SSRFError, "not permitted"): |
| 33 | + validate_url_for_ssrf("gopher://example.com") |
| 34 | + |
| 35 | + def test_no_hostname_raises(self): |
| 36 | + with self.assertRaisesRegex(SSRFError, "no hostname"): |
| 37 | + validate_url_for_ssrf("http://") |
| 38 | + |
| 39 | + def test_loopback_ip_raises(self): |
| 40 | + with self.assertRaisesRegex(SSRFError, "non-public address"): |
| 41 | + validate_url_for_ssrf("http://127.0.0.1/") |
| 42 | + |
| 43 | + def test_private_class_c_raises(self): |
| 44 | + with self.assertRaisesRegex(SSRFError, "non-public address"): |
| 45 | + validate_url_for_ssrf("http://192.168.1.1/") |
| 46 | + |
| 47 | + def test_private_class_a_raises(self): |
| 48 | + with self.assertRaisesRegex(SSRFError, "non-public address"): |
| 49 | + validate_url_for_ssrf("http://10.0.0.1/") |
| 50 | + |
| 51 | + def test_link_local_raises(self): |
| 52 | + with self.assertRaisesRegex(SSRFError, "non-public address"): |
| 53 | + validate_url_for_ssrf("http://169.254.1.1/") |
| 54 | + |
| 55 | + @patch("dojo.utils_ssrf.socket.getaddrinfo", side_effect=socket.gaierror("Name or service not known")) |
| 56 | + def test_unresolvable_hostname_raises(self, mock_getaddrinfo): |
| 57 | + with self.assertRaisesRegex(SSRFError, "Unable to resolve"): |
| 58 | + validate_url_for_ssrf("http://nonexistent.invalid/") |
| 59 | + |
| 60 | + @patch("dojo.utils_ssrf.socket.getaddrinfo", return_value=_MIXED_ADDR_INFO) |
| 61 | + def test_multi_address_with_private_ip_raises(self, mock_getaddrinfo): |
| 62 | + with self.assertRaisesRegex(SSRFError, "non-public address"): |
| 63 | + validate_url_for_ssrf("http://example.com/") |
| 64 | + |
| 65 | + |
| 66 | +class TestMakeSsrfSafeSession(DojoTestCase): |
| 67 | + |
| 68 | + def test_returns_requests_session(self): |
| 69 | + session = make_ssrf_safe_session() |
| 70 | + self.assertIsInstance(session, requests.Session) |
| 71 | + |
| 72 | + def test_http_and_https_mounted_with_safe_adapter(self): |
| 73 | + session = make_ssrf_safe_session() |
| 74 | + self.assertIsInstance(session.get_adapter("http://example.com"), _SSRFSafeAdapter) |
| 75 | + self.assertIsInstance(session.get_adapter("https://example.com"), _SSRFSafeAdapter) |
0 commit comments