Skip to content

Commit 363843f

Browse files
Add files via upload
1 parent 0fdfae4 commit 363843f

1 file changed

Lines changed: 155 additions & 0 deletions

File tree

tests/test_netshield.py

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
write_ip_list,
3737
check_local_feed_age,
3838
fetch_url,
39+
validate_auto_feeds,
3940
)
4041
import netshield_common
4142

@@ -1828,5 +1829,159 @@ def test_loaded_flag_only_true_after_full_validation(self):
18281829
"_whitelist_loaded darf nach failendem Load nicht True sein")
18291830

18301831

1832+
class TestValidateAutoFeeds(unittest.TestCase):
1833+
"""FIX BUG-AUTOFEEDS-VALIDATE: validate_auto_feeds filtert
1834+
auto_discovered_feeds.json-Eintraege auf safe Schema (dict mit
1835+
string name/url) und URL-Whitelist (nur http/https).
1836+
1837+
Vorher: update_combined_blacklist las die Datei direkt mit
1838+
auto_data.get('feeds', []) ohne Pruefung – ein Angreifer mit
1839+
Repo-Schreibrechten konnte malicious URLs in den Feed-Loop
1840+
einschleusen ohne Code-Review-Pfad ueber SOURCES."""
1841+
1842+
def test_accepts_valid_https_feeds(self):
1843+
data = {"feeds": [
1844+
{"name": "good1", "url": "https://example.com/feed.txt"},
1845+
{"name": "good2", "url": "https://other.example.org/list"},
1846+
]}
1847+
accepted, rejected = validate_auto_feeds(data)
1848+
self.assertEqual(len(accepted), 2)
1849+
self.assertEqual(rejected, 0)
1850+
1851+
def test_accepts_http_too(self):
1852+
"""http:// ist erlaubt – fetch_url's SSRF-Schutz schaltet sich
1853+
ohnehin dazwischen, und manche legacy-feeds nutzen http."""
1854+
data = {"feeds": [{"name": "ok", "url": "http://example.com/x"}]}
1855+
accepted, rejected = validate_auto_feeds(data)
1856+
self.assertEqual(len(accepted), 1)
1857+
self.assertEqual(rejected, 0)
1858+
1859+
def test_rejects_file_url(self):
1860+
"""file:// ist ein klarer SSRF/local-file-read Vektor."""
1861+
data = {"feeds": [{"name": "evil", "url": "file:///etc/passwd"}]}
1862+
accepted, rejected = validate_auto_feeds(data)
1863+
self.assertEqual(accepted, [])
1864+
self.assertEqual(rejected, 1)
1865+
1866+
def test_rejects_ftp_url(self):
1867+
data = {"feeds": [{"name": "evil", "url": "ftp://attacker.com/list"}]}
1868+
accepted, rejected = validate_auto_feeds(data)
1869+
self.assertEqual(accepted, [])
1870+
self.assertEqual(rejected, 1)
1871+
1872+
def test_rejects_data_url(self):
1873+
"""data:// kann inline-Code transportieren – nicht fetchen."""
1874+
data = {"feeds": [{"name": "evil", "url": "data:text/plain,1.2.3.4"}]}
1875+
accepted, rejected = validate_auto_feeds(data)
1876+
self.assertEqual(accepted, [])
1877+
self.assertEqual(rejected, 1)
1878+
1879+
def test_rejects_javascript_url(self):
1880+
data = {"feeds": [{"name": "evil",
1881+
"url": "javascript:alert(1)"}]}
1882+
accepted, rejected = validate_auto_feeds(data)
1883+
self.assertEqual(accepted, [])
1884+
self.assertEqual(rejected, 1)
1885+
1886+
def test_rejects_missing_url_field(self):
1887+
data = {"feeds": [{"name": "broken"}]}
1888+
accepted, rejected = validate_auto_feeds(data)
1889+
self.assertEqual(accepted, [])
1890+
self.assertEqual(rejected, 1)
1891+
1892+
def test_rejects_missing_name_field(self):
1893+
data = {"feeds": [{"url": "https://example.com/x"}]}
1894+
accepted, rejected = validate_auto_feeds(data)
1895+
self.assertEqual(accepted, [])
1896+
self.assertEqual(rejected, 1)
1897+
1898+
def test_rejects_non_string_url(self):
1899+
data = {"feeds": [{"name": "x", "url": 12345}]}
1900+
accepted, rejected = validate_auto_feeds(data)
1901+
self.assertEqual(accepted, [])
1902+
self.assertEqual(rejected, 1)
1903+
1904+
def test_rejects_non_dict_entry(self):
1905+
data = {"feeds": ["not-a-dict", ["also", "not"], 42, None]}
1906+
accepted, rejected = validate_auto_feeds(data)
1907+
self.assertEqual(accepted, [])
1908+
self.assertEqual(rejected, 4)
1909+
1910+
def test_mixed_good_and_bad_partial_accept(self):
1911+
"""Bei Mischung: gute Eintraege akzeptieren, schlechte zaehlen."""
1912+
data = {"feeds": [
1913+
{"name": "good", "url": "https://example.com/feed"},
1914+
{"name": "evil", "url": "file:///etc/passwd"},
1915+
{"name": "broken"}, # missing url
1916+
{"name": "alsoOK", "url": "https://other.org/feed"},
1917+
]}
1918+
accepted, rejected = validate_auto_feeds(data)
1919+
self.assertEqual(len(accepted), 2)
1920+
self.assertEqual(rejected, 2)
1921+
self.assertEqual({f["name"] for f in accepted}, {"good", "alsoOK"})
1922+
1923+
def test_root_not_dict_raises(self):
1924+
with self.assertRaises(ValueError):
1925+
validate_auto_feeds(["not", "a", "dict"])
1926+
1927+
def test_feeds_not_list_raises(self):
1928+
with self.assertRaises(ValueError):
1929+
validate_auto_feeds({"feeds": "not-a-list"})
1930+
1931+
def test_feeds_field_missing_returns_empty(self):
1932+
"""Fehlendes 'feeds'-Feld ist OK (analog zur urspruenglichen
1933+
.get(...,[])-Semantik)."""
1934+
accepted, rejected = validate_auto_feeds({})
1935+
self.assertEqual(accepted, [])
1936+
self.assertEqual(rejected, 0)
1937+
1938+
def test_empty_feeds_list(self):
1939+
accepted, rejected = validate_auto_feeds({"feeds": []})
1940+
self.assertEqual(accepted, [])
1941+
self.assertEqual(rejected, 0)
1942+
1943+
1944+
class TestParseEntriesAsExtractIPsDropIn(unittest.TestCase):
1945+
"""FIX BUG-WF5-IPV6-ASN / WF6-IPV6-HEALTH: Workflows die vorher eigene
1946+
IPV4_RE.finditer-Schleifen hatten (asn_reputation_scorer, feed_health_
1947+
monitor) nutzen jetzt parse_entries als Drop-in. Der Vertrag fuer den
1948+
Drop-in: keine Phantom-IPv4 aus IPv6-mapped Tokens, aber echte
1949+
IPv4 in derselben Eingabe bleiben."""
1950+
1951+
def test_ipv6_mapped_token_does_not_create_phantom(self):
1952+
"""Klassischer Phantom-Fall: '::ffff:1.2.3.4' allein."""
1953+
self.assertEqual(parse_entries("::ffff:1.2.3.4"), set())
1954+
1955+
def test_real_ipv4_alongside_ipv6_token_survives(self):
1956+
"""Vermischter Input: echte IPv4 darf nicht durch IPv6-Token-
1957+
Filter mit verworfen werden."""
1958+
text = "::ffff:1.2.3.4\n5.6.7.8\n::1\n9.10.11.12"
1959+
self.assertEqual(parse_entries(text), {"5.6.7.8", "9.10.11.12"})
1960+
1961+
def test_et_feed_style_input_with_phantom_attempt(self):
1962+
"""Simulation eines vergifteten ET-Feeds: Angreifer versucht
1963+
eine IPv6-mapped IPv4 einzuschmuggeln um den ASN-Score eines
1964+
unschuldigen Holders zu verzerren."""
1965+
# 1.2.3.4 ist hier echt, 5.6.7.8 als IPv6-mapped (sollte
1966+
# NICHT als ET-bestaetigt gelten und den et_bonus ausloesen)
1967+
et_text = (
1968+
"# Emerging Threats compromised IPs\n"
1969+
"1.2.3.4\n"
1970+
"::ffff:5.6.7.8\n"
1971+
"9.10.11.12\n"
1972+
)
1973+
result = parse_entries(et_text)
1974+
self.assertIn("1.2.3.4", result)
1975+
self.assertIn("9.10.11.12", result)
1976+
self.assertNotIn("5.6.7.8", result, "Phantom-IPv4 darf nicht")
1977+
1978+
def test_health_monitor_sample_with_only_ipv6_mapped(self):
1979+
"""feed_health_monitor: bei einem Feed der nur IPv6-mapped IPs
1980+
liefert, soll has_ips=False sein (= ip_count==0). Vor Fix:
1981+
IP_RE.findall haette die Phantom-IPv4 gezaehlt → False True."""
1982+
sample = "::ffff:1.2.3.4\n::ffff:5.6.7.8\n2001:db8::1\n"
1983+
self.assertEqual(len(parse_entries(sample)), 0)
1984+
1985+
18311986
if __name__ == "__main__":
18321987
unittest.main(verbosity=2)

0 commit comments

Comments
 (0)