Skip to content

Commit b14e2b1

Browse files
Add files via upload
1 parent c59900e commit b14e2b1

2 files changed

Lines changed: 356 additions & 0 deletions

File tree

tests/test_check_security_hygiene.py

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,5 +125,108 @@ def test_os_fdopen_not_flagged(self):
125125
self.assertEqual(f, [])
126126

127127

128+
class TestModeBypassRegression(unittest.TestCase):
129+
"""FIX BUG-MODE-DYN: Vorher rutschte open(path, mode) mit Variable-Mode
130+
durch den Check, weil nur _ast.Constant akzeptiert wurde. Jetzt loest
131+
der Check Modul-Level-Konstanten auf."""
132+
133+
def test_dynamic_mode_via_module_const_detected(self):
134+
src = (
135+
'WRITE_MODE = "w"\n'
136+
'open("foo.txt", WRITE_MODE)\n'
137+
)
138+
f = _find_non_atomic_writes_in_src(src)
139+
self.assertEqual(len(f), 1, f"Erwartet 1 Finding, bekam {f}")
140+
self.assertEqual(f[0][1], "foo.txt")
141+
self.assertEqual(f[0][2], "w")
142+
143+
def test_dynamic_mode_via_kwarg_detected(self):
144+
src = (
145+
'M = "a"\n'
146+
'open("foo.txt", mode=M)\n'
147+
)
148+
f = _find_non_atomic_writes_in_src(src)
149+
self.assertEqual(len(f), 1)
150+
self.assertEqual(f[0][2], "a")
151+
152+
def test_dynamic_mode_pathlib_open_detected(self):
153+
src = (
154+
'from pathlib import Path\n'
155+
'M = "w"\n'
156+
'Path("foo.txt").open(M)\n'
157+
)
158+
f = _find_non_atomic_writes_in_src(src)
159+
self.assertEqual(len(f), 1)
160+
self.assertEqual(f[0][2], "w")
161+
162+
def test_truly_dynamic_mode_not_flagged(self):
163+
"""Wenn der Mode aus einer wirklich nicht-statischen Quelle kommt
164+
(Funktions-Argument, env, Berechnung), ist der Check still – das
165+
ist gewollt: keine False-Positives bei legitimen Wrappern.
166+
167+
Nur Modul-Level-Konstanten werden aufgeloest."""
168+
src = (
169+
'def f(mode):\n'
170+
' open("foo.txt", mode)\n'
171+
)
172+
f = _find_non_atomic_writes_in_src(src)
173+
self.assertEqual(f, [])
174+
175+
def test_constant_read_mode_via_var_not_flagged(self):
176+
"""Variable-Mode mit Read-Mode wird KORREKT nicht gemeldet."""
177+
src = (
178+
'M = "r"\n'
179+
'open("foo.txt", M)\n'
180+
)
181+
f = _find_non_atomic_writes_in_src(src)
182+
self.assertEqual(f, [])
183+
184+
185+
class TestHeredocRegexRegression(unittest.TestCase):
186+
"""FIX BUG-HEREDOC-INTERP: Die Heredoc-Erkennung muss neben 'python3 << EOF'
187+
auch 'python3.11', 'python', '<<-' und '-u/-B'-Flags akzeptieren – sonst
188+
wird Workflow-inline-Python in diesen Varianten vom Hygiene-Check
189+
komplett uebersprungen."""
190+
191+
HEREDOC_RE_SRC = (
192+
r"\bpython3?(?:\.\d+)?(?:\s+-\w+)*\s*<<-?\s*['\"]?(\w+)['\"]?\s*$"
193+
)
194+
195+
def _matches(self, line):
196+
import re
197+
return bool(re.search(self.HEREDOC_RE_SRC, line, re.MULTILINE))
198+
199+
def test_plain_python3_heredoc(self):
200+
self.assertTrue(self._matches("python3 << EOF"))
201+
202+
def test_python3_with_flags(self):
203+
self.assertTrue(self._matches("python3 -u << EOF"))
204+
self.assertTrue(self._matches("python3 -u -B << EOF"))
205+
206+
def test_python_with_minor_version(self):
207+
self.assertTrue(self._matches("python3.11 << EOF"))
208+
self.assertTrue(self._matches("python3.12 << PYEOF"))
209+
210+
def test_python_without_major_suffix(self):
211+
self.assertTrue(self._matches("python << EOF"))
212+
213+
def test_indent_strip_heredoc(self):
214+
self.assertTrue(self._matches("python3 <<- EOF"))
215+
216+
def test_quoted_delimiter(self):
217+
self.assertTrue(self._matches("python3 << 'EOF'"))
218+
self.assertTrue(self._matches('python3 << "EOF"'))
219+
220+
def test_python_c_oneliner_not_heredoc(self):
221+
self.assertFalse(self._matches("python3 -c 'foo'"))
222+
223+
def test_pythonsomething_not_match(self):
224+
"""'pythonic' oder 'pythonista' soll nicht wie 'python' aussehen."""
225+
# \b stellt sicher, dass nach 'python3?(\.\d+)?' kein Word-Char folgt
226+
# (da darauf \s+ oder \s*<< kommen muss).
227+
self.assertFalse(self._matches("pythonista << EOF"))
228+
self.assertFalse(self._matches("pythonic stuff << EOF"))
229+
230+
128231
if __name__ == "__main__":
129232
unittest.main(verbosity=2)

tests/test_netshield.py

Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,81 @@ def test_no_phantom_ip_from_cidr_network_address(self):
280280
# CIDR-only mit Whitespace davor (auto-discovery sieht das oft so)
281281
self.assertEqual(parse_entries(" 5.5.5.0/24"), {"5.5.5.0/24"})
282282

283+
# ─── Regression: FIX BUG-IPSET-EAGER ────────────────────────────────
284+
# Vorher matchte 'add\s+\S+\s+(\S+)' jede mit "add " beginnende Zeile,
285+
# extrahierte den 3. Token, und beendete die Zeile mit `continue`.
286+
# Wenn das Token keine IP war (Fliesstext, Kommentar, ein zweites Wort),
287+
# ging eine eventuell folgende echte IP verloren.
288+
289+
def test_ipset_eager_match_does_not_swallow_ip_in_freetext(self):
290+
"""'add notes here 1.2.3.4 important' darf 1.2.3.4 NICHT verlieren."""
291+
result = parse_entries("add notes here 1.2.3.4 important")
292+
self.assertEqual(result, {"1.2.3.4"})
293+
294+
def test_ipset_with_trailing_extra_ip(self):
295+
"""'add badguys 1.2.3.4 5.6.7.8' findet beide IPs (Fallback-Pfad)."""
296+
result = parse_entries("add badguys 1.2.3.4 5.6.7.8")
297+
self.assertEqual(result, {"1.2.3.4", "5.6.7.8"})
298+
299+
def test_fortigate_with_trailing_extra_ip(self):
300+
"""FortiGate-aehnliche Zeile mit zweiter IP: beide muessen rein."""
301+
# 255.255.255.255 ist Broadcast (240/4) und wird abgelehnt – das ist OK
302+
result = parse_entries("set subnet 1.2.3.4 8.8.8.8")
303+
self.assertEqual(result, {"1.2.3.4", "8.8.8.8"})
304+
305+
def test_ipset_private_value_does_not_leak_neighbor(self):
306+
"""ipset-Zeile mit privatem CIDR + Fliesstext-Anhang: privater
307+
CIDR korrekt verworfen, Fliesstext loest keinen Phantom-Eintrag aus."""
308+
result = parse_entries("add badguys 10.0.0.0/8")
309+
self.assertEqual(result, set()) # rein privat → leer
310+
311+
def test_add_prefix_is_not_a_freepass(self):
312+
"""Eine Zeile die nur zufaellig mit 'add' anfaengt darf den
313+
Inline-Parser nicht ueberspringen."""
314+
# "added" beginnt mit "add", Regex sollte nicht greifen (\b-Boundary
315+
# nicht explizit, aber \s+ verlangt Whitespace nach 'add'). Hier
316+
# testen wir nur: Multi-IP-Zeilen mit 'add'-Praefix verlieren keine IP.
317+
result = parse_entries("added 1.1.1.1 and 2.2.2.2 to blocklist")
318+
# 1.1.1.1 und 2.2.2.2 sind beide oeffentlich
319+
self.assertEqual(result, {"1.1.1.1", "2.2.2.2"})
320+
321+
# ─── Regression: FIX BUG-IPV6-MAPPED ────────────────────────────────
322+
# Der Lookbehind '(?<![\d.])' in IPV4_RE liess ':' als Trenner durch.
323+
# IPv4-mapped-IPv6 ('::ffff:1.2.3.4') und volle IPv6 mit IPv4-Suffix
324+
# ('2001:db8::ffff:192.0.2.1') erzeugten Phantom-IPv4-Eintraege.
325+
# Die Token-Heuristik prueft '::' / >=2 ':' im whitespace-Token.
326+
327+
def test_ipv4_mapped_ipv6_no_phantom(self):
328+
"""'::ffff:1.2.3.4' darf kein 1.2.3.4 erzeugen."""
329+
self.assertEqual(parse_entries("::ffff:1.2.3.4"), set())
330+
331+
def test_full_ipv6_with_v4_suffix_no_phantom(self):
332+
"""'2001:db8::ffff:192.0.2.1' (IPv4-mapped in vollem v6) → leer."""
333+
self.assertEqual(parse_entries("2001:db8::ffff:192.0.2.1"), set())
334+
335+
def test_ipv6_loopback_does_not_extract_anything(self):
336+
self.assertEqual(parse_entries("::1"), set())
337+
self.assertEqual(parse_entries("fe80::1234"), set())
338+
339+
def test_mixed_ipv4_and_ipv6_in_csv(self):
340+
"""CSV-Mischung: IPv4 muss extrahiert werden, IPv6 nicht."""
341+
result = parse_entries("1.2.3.4,2001:db8::1,5.6.7.8")
342+
self.assertEqual(result, {"1.2.3.4", "5.6.7.8"})
343+
344+
def test_ip_port_still_works_after_ipv6_fix(self):
345+
"""Sanity: 'IP:port' (1 Doppelpunkt) bleibt funktional."""
346+
self.assertEqual(parse_entries("1.2.3.4:8080"), {"1.2.3.4"})
347+
348+
def test_host_colon_ip_still_works(self):
349+
"""Sanity: 'host:1.2.3.4' (1 Doppelpunkt im Token) bleibt
350+
unbeeintraechtigt – das Token enthaelt nur 1 ':', kein '::'."""
351+
self.assertEqual(parse_entries("host:1.2.3.4"), {"1.2.3.4"})
352+
353+
def test_ipv6_token_neighbour_ipv4_not_swallowed(self):
354+
"""IPv4 in eigener Token-Position neben IPv6 wird gefunden."""
355+
result = parse_entries("foo 1.2.3.4 ::1 bar")
356+
self.assertEqual(result, {"1.2.3.4"})
357+
283358

284359
class TestParseEntriesForBlacklist(unittest.TestCase):
285360
"""FIX API-CLARITY: parse_entries_for_blacklist ist der empfohlene
@@ -594,6 +669,64 @@ def test_load_fp_set_missing_file(self):
594669
self.assertEqual(len(fp_ips), 0)
595670
self.assertEqual(len(fp_nets), 0)
596671

672+
# ─── Regression: FIX BUG-FP-STRICT ──────────────────────────────────
673+
# Vorher genuegte ein String wie "1.2.3.4" dem data.get("ips", []),
674+
# die for-Schleife iterierte ueber die Zeichen, und das Set enthielt
675+
# danach {'1', '.', '2', '3', '4'}. is_in_fp_set('.') wurde True –
676+
# beliebige Substrings galten als False-Positive.
677+
678+
def test_load_fp_set_ips_as_string_does_not_corrupt_state(self):
679+
"""ips=String darf das FP-Set NICHT mit Einzelzeichen befuellen."""
680+
with open(self.fp_path, 'w') as f:
681+
json.dump({"ips": "1.2.3.4"}, f)
682+
fp_ips, fp_nets = load_fp_set(self.fp_path)
683+
# Erwartung: leerer State (Schema-Reject), NICHT {'1','.','2',...}
684+
self.assertEqual(fp_ips, set())
685+
self.assertEqual(fp_nets, [])
686+
# Und: Punkt darf nicht als FP gelten
687+
self.assertFalse(is_in_fp_set("."))
688+
# Auch nicht "1" oder andere Einzelzeichen
689+
self.assertFalse(is_in_fp_set("1"))
690+
691+
def test_load_fp_set_root_not_dict_returns_empty(self):
692+
"""JSON-Root ist eine Liste oder ein String → leerer State."""
693+
# Liste statt dict
694+
with open(self.fp_path, 'w') as f:
695+
json.dump([], f)
696+
fp_ips, fp_nets = load_fp_set(self.fp_path)
697+
self.assertEqual(fp_ips, set())
698+
self.assertEqual(fp_nets, [])
699+
700+
# String statt dict
701+
with open(self.fp_path, 'w') as f:
702+
json.dump("hello", f)
703+
fp_ips, fp_nets = load_fp_set(self.fp_path)
704+
self.assertEqual(fp_ips, set())
705+
self.assertEqual(fp_nets, [])
706+
707+
def test_load_fp_set_skips_non_string_entries(self):
708+
"""Mix aus validen Strings und Datenmuell – nur Strings werden uebernommen."""
709+
with open(self.fp_path, 'w') as f:
710+
json.dump({"ips": ["1.2.3.4", None, 123, {"x": "y"}, "5.6.7.0/24"]}, f)
711+
fp_ips, fp_nets = load_fp_set(self.fp_path)
712+
self.assertEqual(fp_ips, {"1.2.3.4"})
713+
self.assertEqual(len(fp_nets), 1)
714+
715+
def test_load_fp_set_partial_fill_is_reset_on_schema_error(self):
716+
"""Wenn der Schema-Check FAILT NACH einem Teilbefuelle, muss der
717+
State wieder leer sein – kein 'Geister-FP-Set'."""
718+
# Vorher legitim gefuellt
719+
with open(self.fp_path, 'w') as f:
720+
json.dump({"ips": ["1.2.3.4", "9.9.9.9"]}, f)
721+
load_fp_set(self.fp_path)
722+
# Jetzt korrupte Datei (Root ist Liste, nicht Dict)
723+
with open(self.fp_path, 'w') as f:
724+
json.dump(["x"], f)
725+
fp_ips, fp_nets = load_fp_set(self.fp_path)
726+
# Re-Load mit Schema-Error → leer, NICHT alter State
727+
self.assertEqual(fp_ips, set())
728+
self.assertEqual(fp_nets, [])
729+
597730
def test_is_in_fp_set(self):
598731
netshield_common._fp_ips = {"1.2.3.4"}
599732
netshield_common._fp_networks = [
@@ -987,6 +1120,30 @@ def do_GET(self):
9871120
self.send_response(200)
9881121
self.end_headers()
9891122
self.wfile.write(b"feed-content\n1.2.3.4\n5.6.7.8")
1123+
elif self.path == "/gzip-small":
1124+
# Korrekt gepackter, kleiner gzip-Stream
1125+
import gzip as _gzip
1126+
payload = _gzip.compress(b"1.2.3.4\n5.6.7.8\n")
1127+
self.send_response(200)
1128+
self.end_headers()
1129+
self.wfile.write(payload)
1130+
elif self.path == "/gzip-bomb":
1131+
# Klassische zip-bomb: stark komprimierbarer Pseudo-Stream.
1132+
# 50 MiB Nullbytes komprimiert auf ~50 KB. Wenn fetch_url
1133+
# eager gzip.decompress aufruft (alter Code), wuerden
1134+
# waehrend der Decompression 50 MiB allokiert. Das
1135+
# Streaming-Limit muss nach read_limit Bytes abbrechen.
1136+
import gzip as _gzip
1137+
payload = _gzip.compress(b"\x00" * (50 * 1024 * 1024))
1138+
self.send_response(200)
1139+
self.end_headers()
1140+
self.wfile.write(payload)
1141+
elif self.path == "/gzip-broken":
1142+
# Gzip-Magic, aber kaputter Stream: muss als FEHLER
1143+
# behandelt werden, nicht crashen.
1144+
self.send_response(200)
1145+
self.end_headers()
1146+
self.wfile.write(b"\x1f\x8bnot-a-real-gzip-stream")
9901147
elif self.path == "/redirect-safe":
9911148
self.send_response(302)
9921149
# Redirect auf eigenen /ok-Pfad (auch lokal → blockiert
@@ -1091,6 +1248,35 @@ def test_read_limit_respected(self):
10911248
# read_limit=5 → nur die ersten 5 Bytes
10921249
self.assertEqual(len(result), 5)
10931250

1251+
# ─── Regression: FIX BUG-GZIP-BOMB ──────────────────────────────────
1252+
# Vorher: gzip.decompress(data) lud das gesamte expandierte Ergebnis in
1253+
# den Speicher, BEVOR der Limit-Check griff. Eine 50 KB komprimierte
1254+
# zip-bomb konnte 50 MiB allokieren – bei groesseren Bombs OOM. Jetzt:
1255+
# streaming via GzipFile mit harter read(read_limit + 1) Grenze.
1256+
1257+
def test_gzip_small_decompressed_correctly(self):
1258+
"""Sanity: kleiner gzip-Stream wird transparent dekomprimiert."""
1259+
from netshield_common import fetch_url
1260+
result = fetch_url(self._url("/gzip-small"))
1261+
self.assertIsNotNone(result)
1262+
self.assertIn("1.2.3.4", result)
1263+
self.assertIn("5.6.7.8", result)
1264+
1265+
def test_gzip_bomb_aborts_below_read_limit(self):
1266+
"""Bomb (50 MiB nullbytes komprimiert) mit read_limit=1MB:
1267+
Streaming-Decompress muss abbrechen und None liefern, NICHT die
1268+
Bombe materialisieren."""
1269+
from netshield_common import fetch_url
1270+
result = fetch_url(self._url("/gzip-bomb"), read_limit=1 * 1024 * 1024)
1271+
# Erwartet: None (Fetch verworfen)
1272+
self.assertIsNone(result)
1273+
1274+
def test_gzip_broken_stream_returns_none(self):
1275+
"""Gzip-Magic mit kaputtem Inhalt: kein Crash, nur None."""
1276+
from netshield_common import fetch_url
1277+
result = fetch_url(self._url("/gzip-broken"))
1278+
self.assertIsNone(result)
1279+
10941280

10951281
# ═══════════════════════════════════════════════════════════════
10961282
# Coverage-Tests: is_protected_entry — CIDR- und Sonderfälle
@@ -1516,6 +1702,73 @@ def sentinel(host, port, *a, **kw):
15161702
netshield_common._original_getaddrinfo = None
15171703

15181704

1705+
class TestPinRestore(unittest.TestCase):
1706+
"""FIX BUG-PIN-RESTORE: _pin_host gibt jetzt den vorherigen Pin zurueck,
1707+
_restore_pin stellt ihn nach dem Fetch wieder her – statt den Pin
1708+
bedingungslos zu loeschen. Schuetzt vor:
1709+
(a) Re-Pinning desselben Hosts innerhalb eines fetch_url (Redirect
1710+
gleicher-Host, anderer-Pfad → Pin wuerde im finally weggewischt
1711+
obwohl noch ein gueltiger State davor stand)
1712+
(b) verschachtelten fetch_url-Aufrufen die zufaellig denselben Host
1713+
treffen.
1714+
"""
1715+
1716+
def setUp(self):
1717+
# Pin-State sauber initialisieren
1718+
if hasattr(netshield_common._pin_state, "pin_map"):
1719+
netshield_common._pin_state.pin_map.clear()
1720+
1721+
def tearDown(self):
1722+
if hasattr(netshield_common._pin_state, "pin_map"):
1723+
netshield_common._pin_state.pin_map.clear()
1724+
1725+
def test_pin_returns_absent_sentinel_when_no_previous(self):
1726+
prev = netshield_common._pin_host("example.com", ["1.2.3.4"])
1727+
self.assertIs(prev, netshield_common._PIN_ABSENT)
1728+
self.assertEqual(
1729+
netshield_common._pin_state.pin_map["example.com"], ["1.2.3.4"])
1730+
1731+
def test_pin_returns_previous_when_repinning(self):
1732+
netshield_common._pin_host("example.com", ["1.2.3.4"])
1733+
prev = netshield_common._pin_host("example.com", ["5.6.7.8"])
1734+
self.assertEqual(prev, ["1.2.3.4"])
1735+
self.assertEqual(
1736+
netshield_common._pin_state.pin_map["example.com"], ["5.6.7.8"])
1737+
1738+
def test_restore_with_absent_sentinel_deletes_pin(self):
1739+
prev = netshield_common._pin_host("example.com", ["1.2.3.4"])
1740+
netshield_common._restore_pin("example.com", prev)
1741+
self.assertNotIn("example.com", netshield_common._pin_state.pin_map)
1742+
1743+
def test_restore_with_previous_value_overwrites(self):
1744+
prev1 = netshield_common._pin_host("example.com", ["1.2.3.4"]) # absent
1745+
prev2 = netshield_common._pin_host("example.com", ["5.6.7.8"]) # ["1.2.3.4"]
1746+
# innerer Cleanup: zurueck auf prev2 = ["1.2.3.4"]
1747+
netshield_common._restore_pin("example.com", prev2)
1748+
self.assertEqual(
1749+
netshield_common._pin_state.pin_map["example.com"], ["1.2.3.4"])
1750+
# aeusserer Cleanup: zurueck auf prev1 = absent → geloescht
1751+
netshield_common._restore_pin("example.com", prev1)
1752+
self.assertNotIn("example.com", netshield_common._pin_state.pin_map)
1753+
1754+
def test_lifo_unwinding_preserves_correct_state(self):
1755+
"""Reihenfolge des Restorens muss LIFO sein, sonst wird der
1756+
falsche Pin oben gelassen. fetch_url macht das per
1757+
`for ... in reversed(pinned_hosts)`."""
1758+
# Drei Pin-Operationen auf demselben Host
1759+
prevs = []
1760+
prevs.append(netshield_common._pin_host("h", ["A"]))
1761+
prevs.append(netshield_common._pin_host("h", ["B"]))
1762+
prevs.append(netshield_common._pin_host("h", ["C"]))
1763+
# State jetzt: ["C"]
1764+
self.assertEqual(netshield_common._pin_state.pin_map["h"], ["C"])
1765+
# LIFO unwind
1766+
for prev in reversed(prevs):
1767+
netshield_common._restore_pin("h", prev)
1768+
# Stack vollstaendig abgewickelt → kein Mapping mehr
1769+
self.assertNotIn("h", netshield_common._pin_state.pin_map)
1770+
1771+
15191772
# ═══════════════════════════════════════════════════════════════
15201773
# Regression: BUG-WL1-STRICT (Fail-Open via korrupter whitelist.json)
15211774
# ═══════════════════════════════════════════════════════════════

0 commit comments

Comments
 (0)