Skip to content

Commit 75756b6

Browse files
Add files via upload
1 parent b14e2b1 commit 75756b6

2 files changed

Lines changed: 213 additions & 48 deletions

File tree

scripts/check_security_hygiene.py

Lines changed: 46 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,30 @@ def _resolve_pathlib_receiver(receiver, static_strings):
247247
return None
248248

249249

250+
def _resolve_mode_arg(mode_arg, static_strings):
251+
"""Loest einen Mode-AST-Node zu einem String auf.
252+
253+
FIX BUG-MODE-DYN: Vorher wurde nur isinstance(mode_arg, _ast.Constant)
254+
akzeptiert. Code wie
255+
256+
WRITE_MODE = "w"
257+
open(path, WRITE_MODE)
258+
259+
rutschte komplett durch den Hygiene-Check – ein triviater Bypass-Vektor.
260+
Jetzt: Wenn der Mode eine Variable ist, wird im static_strings-Mapping
261+
nachgeschlagen (analog zur Pfad-Aufloesung).
262+
263+
Returns:
264+
str | None: Mode als String, falls statisch bestimmbar.
265+
"""
266+
import ast as _ast
267+
if isinstance(mode_arg, _ast.Constant) and isinstance(mode_arg.value, str):
268+
return mode_arg.value
269+
if isinstance(mode_arg, _ast.Name):
270+
return static_strings.get(mode_arg.id)
271+
return None
272+
273+
250274
def _find_non_atomic_writes_in_src(source_text):
251275
"""Findet alle non-atomaren Write-Calls per AST-Walk.
252276
@@ -305,11 +329,12 @@ def _find_non_atomic_writes_in_src(source_text):
305329
break
306330
if mode_arg is None:
307331
continue
308-
if not (isinstance(mode_arg, _ast.Constant)
309-
and isinstance(mode_arg.value, str)
310-
and mode_arg.value in _WRITE_MODES):
332+
# FIX BUG-MODE-DYN: Variablen-Modes ueber Modul-Konstanten aufloesen,
333+
# statt sie als "kann ich nicht statisch entscheiden" durchzulassen.
334+
mode_value = _resolve_mode_arg(mode_arg, static_strings)
335+
if mode_value is None or mode_value not in _WRITE_MODES:
311336
continue
312-
mode_label = mode_arg.value
337+
mode_label = mode_value
313338
path_arg = node.args[0]
314339
path_str = _resolve_path_arg(path_arg, static_strings)
315340

@@ -329,11 +354,11 @@ def _find_non_atomic_writes_in_src(source_text):
329354
break
330355
if mode_arg is None:
331356
continue
332-
if not (isinstance(mode_arg, _ast.Constant)
333-
and isinstance(mode_arg.value, str)
334-
and mode_arg.value in _WRITE_MODES):
357+
# FIX BUG-MODE-DYN: konsistent zu Pattern 1.
358+
mode_value = _resolve_mode_arg(mode_arg, static_strings)
359+
if mode_value is None or mode_value not in _WRITE_MODES:
335360
continue
336-
mode_label = mode_arg.value
361+
mode_label = mode_value
337362
path_str = _resolve_pathlib_receiver(node.func.value, static_strings)
338363
elif attr in ("write_text", "write_bytes"):
339364
# Path.write_text(...) ist immer non-atomar (single
@@ -391,10 +416,17 @@ def _report(origin, lineno, path_str, mode):
391416
if WORKFLOWS_DIR.is_dir():
392417
# FIX HEREDOC-FLAGS: '-?' matchte vorher nur ein einzelnes '-'.
393418
# 'python3 -u << EOF' (unbuffered, sehr verbreitet in CI) und
394-
# 'python3 -B << EOF' rutschten durch. Jetzt beliebig viele
395-
# Single-Char- oder Multi-Char-Flags zulassen.
419+
# 'python3 -B << EOF' rutschten durch.
420+
# FIX BUG-HEREDOC-INTERP: Vorher matchten zusaetzlich nicht:
421+
# - 'python3.11 << EOF' / 'python3.12 …' (gepinnte Versionen)
422+
# - 'python << EOF' (ohne Major-Suffix)
423+
# - 'python3 <<- EOF' (heredoc mit indent-strip)
424+
# Workflow-inline-Python in diesen Varianten wurde komplett
425+
# uebersprungen, der Hygiene-Check fand dort weder non-atomare
426+
# Writes noch ungeschuetzte Fetches – ein stiller Bypass-Vektor.
396427
heredoc_start = re.compile(
397-
r"python3(?:\s+-\w+)*\s*<<\s*['\"]?(\w+)['\"]?\s*$", re.MULTILINE)
428+
r"\bpython3?(?:\.\d+)?(?:\s+-\w+)*\s*<<-?\s*['\"]?(\w+)['\"]?\s*$",
429+
re.MULTILINE)
398430
import textwrap as _tw
399431
for wf in sorted(WORKFLOWS_DIR.glob("*.yml")):
400432
content = wf.read_text(encoding="utf-8")
@@ -572,8 +604,10 @@ def _is_static_url_node(n):
572604
if WORKFLOWS_DIR.is_dir():
573605
# FIX HEREDOC-FLAGS: konsistent zu check_atomic_writes – auch
574606
# python3 -u/-B/-X… erkennen.
607+
# FIX BUG-HEREDOC-INTERP: gleiche Lockerung wie in check_atomic_writes
608+
# (python3.11, python, '<<-' Heredoc mit indent-strip).
575609
heredoc_re = re.compile(
576-
r"python3(?:\s+-\w+)*\s*<<\s*['\"]?(\w+)['\"]?\s*$",
610+
r"\bpython3?(?:\.\d+)?(?:\s+-\w+)*\s*<<-?\s*['\"]?(\w+)['\"]?\s*$",
577611
re.MULTILINE,
578612
)
579613
for wf in sorted(WORKFLOWS_DIR.glob("*.yml")):

scripts/netshield_common.py

Lines changed: 167 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,28 @@ def load_fp_set(path="false_positives_set.json"):
366366
try:
367367
with open(path) as f:
368368
data = json.load(f)
369-
for entry in data.get("ips", []):
369+
# FIX BUG-FP-STRICT: 'ips' MUSS eine Liste sein. Vorher genuegte ein
370+
# String wie "1.2.3.4" – data.get("ips", []) lieferte den String,
371+
# die for-Schleife iterierte ueber die Zeichen, und das Set enthielt
372+
# danach {'1', '.', '2', '3', '4'}. Folge: is_in_fp_set('.') == True,
373+
# und beliebige IPs/Substrings wurden faelschlich als False-Positive
374+
# markiert – das FP-Set verfehlt seine Filterfunktion.
375+
# Selbe Fail-Loud-Strategie wie load_whitelist (BUG-WL1-STRICT):
376+
# lieber Workflow-Crash als silent korrumpierter State.
377+
if not isinstance(data, dict):
378+
raise ValueError(
379+
f"'false_positives_set.json' Root ist {type(data).__name__}, "
380+
f"erwartet dict")
381+
ips_field = data.get("ips", [])
382+
if not isinstance(ips_field, list):
383+
raise ValueError(
384+
f"'false_positives_set.json': 'ips' ist {type(ips_field).__name__}, "
385+
f"erwartet list")
386+
for entry in ips_field:
387+
# Nur String-Eintraege akzeptieren – None/int/dict silent skippen
388+
# (Schema-Drift, aber fail-soft pro Entry, nicht pro Datei).
389+
if not isinstance(entry, str):
390+
continue
370391
try:
371392
if "/" in entry:
372393
_fp_networks.append(ipaddress.ip_network(entry, strict=False))
@@ -376,6 +397,10 @@ def load_fp_set(path="false_positives_set.json"):
376397
pass
377398
print(f"false_positives_set.json: {len(_fp_ips)} IPs + {len(_fp_networks)} CIDRs geladen")
378399
except Exception as e:
400+
# Bei Schema-/Parse-Fehler State zurueck auf leer (defensiv – falls
401+
# zwischen Init oben und except hier eine partielle Befuellung lief).
402+
_fp_ips = set()
403+
_fp_networks = []
379404
print(f"WARNUNG: false_positives_set.json nicht lesbar: {e}")
380405
_rebuild_fp_index()
381406
return _fp_ips, _fp_networks
@@ -515,25 +540,35 @@ def parse_entries(text, use_protected_check=False):
515540
if not line or line.startswith('#') or line.startswith(';') or line.startswith('//'):
516541
continue
517542

518-
# FortiGate: "set subnet 1.2.3.4 ..."
519-
fg = re.match(r'set\s+subnet\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', line)
520-
if fg:
521-
ip = fg.group(1)
522-
if ip_check(ip):
523-
entries.add(ip)
524-
continue
525-
526-
# ipset: "add setname 1.2.3.4" oder "add setname 1.2.3.0/24"
527-
ipset_m = re.match(r'add\s+\S+\s+(\S+)', line)
528-
if ipset_m:
529-
val = ipset_m.group(1).split(';')[0].strip()
530-
if '/' in val:
531-
if cidr_check(val):
532-
entries.add(str(ipaddress.ip_network(val, strict=False)))
533-
else:
534-
if ip_check(val):
535-
entries.add(val)
536-
continue
543+
# FIX BUG-IPSET-EAGER: Vorher gab es hier Format-spezifische
544+
# Fast-Pfade fuer FortiGate ("set subnet 1.2.3.4 ...") und ipset
545+
# ("add setname 1.2.3.4"), jeweils mit `continue` am Ende. Beide
546+
# waren gefaehrlich:
547+
#
548+
# 1) ipset: die Regex `add\s+\S+\s+(\S+)` matcht JEDE Zeile mit
549+
# "add " am Anfang – auch Fliesstext wie
550+
# "add notes here 1.2.3.4 important". val wurde dann "here",
551+
# ip_check schlug fehl, und das `continue` schluckte die
552+
# echte IP "1.2.3.4". Datenverlust.
553+
# 2) FortiGate: strenger (verlangt IP direkt nach "set subnet"),
554+
# aber bei Inline-Kommentaren mit zweiter IP
555+
# ("set subnet 1.2.3.4 # backup 8.8.8.8") wurde 8.8.8.8 ebenso
556+
# verworfen – Pfad-Inkonsistenz mit dem dokumentierten
557+
# "Fallback ist Superset"-Vertrag aus FIX BUG-MULTI-ENTRY.
558+
#
559+
# Loesung: Format-spezifische Pfade entfernt. Der untere Fallback
560+
# findet alle IPs/CIDRs in der Zeile per IPV4_RE/CIDR_RE.finditer
561+
# und ist robust gegen alle hier relevanten Formate:
562+
# - "set subnet 1.2.3.4 ..." → IPV4_RE matcht 1.2.3.4
563+
# - "add setname 1.2.3.4" → IPV4_RE matcht 1.2.3.4
564+
# - "add setname 1.2.3.0/24" → CIDR_RE matcht, IP wird per
565+
# cidr_spans nicht doppelt erfasst
566+
# - "1.2.3.4:8080" → IPV4_RE matcht 1.2.3.4
567+
#
568+
# Zu beachten: bei einer ipset-Zeile mit privatem CIDR
569+
# ("add setname 10.0.0.0/8") wird der CIDR korrekt verworfen
570+
# (cidr_check=False), und der Fallback tut nichts mehr – exakt
571+
# gleiches Verhalten wie vorher.
537572

538573
# Inline-Kommentar abschneiden (Spamhaus DROP: "1.2.3.0/24 ; SBLxxx")
539574
line = re.split(r'\s*[;#]', line)[0].strip()
@@ -592,13 +627,44 @@ def parse_entries(text, use_protected_check=False):
592627
# IP innerhalb einer CIDR-Span ueberspringen (= Netzadresse der CIDR)
593628
if any(start <= m.start() < end for start, end in cidr_spans):
594629
continue
630+
# FIX BUG-IPV6-MAPPED: '::ffff:1.2.3.4' und Verwandte (IPv4-mapped
631+
# IPv6) duerfen die '1.2.3.4' nicht als Phantom-Eintrag durchlassen.
632+
# Der Lookbehind '(?<![\d.])' der IPV4_RE laesst ':' als Trenner
633+
# gelten – und matcht damit den IPv4-Suffix einer IPv6-Adresse.
634+
# Heuristik: wenn das whitespace-getrennte Token der Match-
635+
# Position '::' enthaelt oder >=2 Doppelpunkte hat, ist es ein
636+
# IPv6-Token und der IPv4-Suffix wird verworfen. Plain "host:1.2.3.4"
637+
# (1 Doppelpunkt) bleibt unbeeintraechtigt.
638+
if _is_in_ipv6_token(line, m.start(), m.end()):
639+
continue
595640
ip = m.group(1)
596641
if ip_check(ip):
597642
entries.add(ip)
598643

599644
return entries
600645

601646

647+
def _is_in_ipv6_token(line, start, end):
648+
"""True wenn die Position [start:end] in einem whitespace-Token liegt,
649+
das wie eine IPv6-Adresse aussieht (enthaelt '::' oder >=2 ':').
650+
651+
Token-Boundary ist Whitespace; wir scannen rueckwaerts/vorwaerts vom
652+
Match. CSV-Komma und Klammern werden als Token-Begrenzer behandelt,
653+
damit `2001:db8::1,1.2.3.4` zwei Tokens ergibt.
654+
"""
655+
# Token-Boundary: Whitespace + Standard-Trennzeichen die in Feeds
656+
# vorkommen (Komma in CSV, Klammern in JSON-aehnlichen Strings).
657+
boundaries = " \t\n\r,()[]{}\"'"
658+
t_start = start
659+
while t_start > 0 and line[t_start - 1] not in boundaries:
660+
t_start -= 1
661+
t_end = end
662+
while t_end < len(line) and line[t_end] not in boundaries:
663+
t_end += 1
664+
token = line[t_start:t_end]
665+
return "::" in token or token.count(":") >= 2
666+
667+
602668
def parse_entries_for_blacklist(text):
603669
"""Pipeline-Modus-Wrapper um parse_entries(use_protected_check=True).
604670
@@ -860,15 +926,55 @@ def _pinned_getaddrinfo(host, port, *args, **kwargs):
860926

861927

862928
def _pin_host(hostname, ips):
863-
"""Fuegt ein Hostname→IP-Mapping fuer die Dauer des Fetch hinzu."""
929+
"""Setzt ein Hostname→IP-Mapping fuer die Dauer des Fetch und gibt
930+
den vorherigen Pin (oder ein Sentinel) zurueck, damit der Aufrufer
931+
ihn nach dem Fetch wiederherstellen kann.
932+
933+
FIX BUG-PIN-RESTORE: Vorher loeschte _unpin_host das Mapping
934+
unconditionally. Bei einem (theoretisch moeglichen) Redirect auf
935+
denselben Hostname mit aenderndem Pin – oder bei nested fetch_url
936+
auf gleichem Host – fuehrte das dazu, dass der innere Cleanup den
937+
Pin des aeusseren Aufrufs entfernte. Der aeussere lief danach ohne
938+
DNS-Rebind-Schutz weiter. Save-and-restore beseitigt diese Klasse.
939+
940+
Returns:
941+
Der zuvor gespeicherte Pin (Liste) oder das Sentinel _PIN_ABSENT,
942+
wenn vorher kein Mapping existierte.
943+
"""
864944
_install_dns_pin()
865945
if not hasattr(_pin_state, "pin_map"):
866946
_pin_state.pin_map = {}
947+
previous = _pin_state.pin_map.get(hostname, _PIN_ABSENT)
867948
_pin_state.pin_map[hostname] = ips
949+
return previous
950+
951+
952+
def _restore_pin(hostname, previous):
953+
"""Stellt den Pin-Zustand wieder her wie er VOR _pin_host(...) war.
954+
955+
Wenn previous == _PIN_ABSENT war kein Mapping vorhanden → loeschen.
956+
Sonst → ueberschreiben.
957+
"""
958+
pin_map = getattr(_pin_state, "pin_map", None)
959+
if pin_map is None:
960+
return
961+
if previous is _PIN_ABSENT:
962+
pin_map.pop(hostname, None)
963+
else:
964+
pin_map[hostname] = previous
965+
966+
967+
# Sentinel-Wert: unterscheidet "kein vorheriger Pin" von "vorheriger Pin
968+
# war eine leere Liste". Eine leere Liste sollte nie auftreten, weil
969+
# _is_safe_public_host bei 0 IPs None liefert – aber explizit ist sicherer.
970+
_PIN_ABSENT = object()
868971

869972

870973
def _unpin_host(hostname):
871-
"""Entfernt das Mapping nach dem Fetch."""
974+
"""Entfernt das Mapping nach dem Fetch.
975+
976+
FIX BUG-PIN-RESTORE: Behalten fuer Backward-Compat (wird sonst
977+
nirgends mehr in fetch_url benutzt – siehe _restore_pin)."""
872978
pin_map = getattr(_pin_state, "pin_map", None)
873979
if pin_map and hostname in pin_map:
874980
del pin_map[hostname]
@@ -899,7 +1005,11 @@ def fetch_url(url, timeout=30, retries=3, user_agent="NETSHIELD/3.0",
8991005
import urllib.error
9001006
import urllib.parse
9011007

902-
pinned_hosts = [] # fuer finally-Cleanup
1008+
# Liste von (hostname, previous_pin) Paaren fuer finally-Cleanup.
1009+
# FIX BUG-PIN-RESTORE: vorher nur Hostnames; jetzt auch der zuvor
1010+
# gespeicherte Pin, damit nested/redirect-same-host-Faelle den State
1011+
# nicht durcheinanderbringen.
1012+
pinned_hosts = []
9031013

9041014
def _validate(u):
9051015
"""Validiert URL und pinnt den Host auf die geprueften IPs.
@@ -926,8 +1036,8 @@ def _validate(u):
9261036
# gepatcht ist (legacy API), wird nicht gepinnt und der Fetch
9271037
# laeuft ohne Rebind-Schutz weiter (Test-Kontext, kein Risiko).
9281038
if isinstance(safe_ips, list):
929-
_pin_host(parsed.hostname, safe_ips)
930-
pinned_hosts.append(parsed.hostname)
1039+
previous = _pin_host(parsed.hostname, safe_ips)
1040+
pinned_hosts.append((parsed.hostname, previous))
9311041
return True
9321042

9331043
if not _validate(url):
@@ -967,19 +1077,38 @@ def redirect_request(self, req, fp, code, msg, headers, newurl):
9671077
# FIX GZIP: Transparent gepackte Feeds (.gz) dekomprimieren.
9681078
# Erkennung über Magic-Bytes (\x1f\x8b) – URL-unabhängig, greift
9691079
# auch wenn Server kein .gz-Suffix in der URL hat oder gzip per
970-
# Content-Encoding ausliefert. Limit-Check nach Decompression
971-
# erneut: gzip kann 10–20x expandieren (zip-bomb-Schutz).
1080+
# Content-Encoding ausliefert.
1081+
#
1082+
# FIX BUG-GZIP-BOMB: gzip.decompress(data) lud das KOMPLETTE
1083+
# expandierte Ergebnis in den Speicher, BEVOR der Limit-Check
1084+
# griff. 25 MB komprimiert koennen auf mehrere GB expandieren
1085+
# → OOM des Runners. Der alte Kommentar "zip-bomb-Schutz"
1086+
# stimmte nicht – der Schutz wirkte erst nach der Allokation.
1087+
# Jetzt: streaming via GzipFile mit harter read(read_limit + 1)
1088+
# Grenze. Wenn der Stream mehr liefern wuerde, wird er
1089+
# abgeschnitten und der Fetch failt fail-loud.
9721090
if data[:2] == b"\x1f\x8b":
1091+
import gzip as _gzip
1092+
import io as _io
9731093
try:
974-
import gzip as _gzip
975-
data = _gzip.decompress(data)
976-
except (OSError, MemoryError) as _gz_err:
1094+
with _gzip.GzipFile(fileobj=_io.BytesIO(data)) as _gz:
1095+
# +1 Byte um Truncation zuverlaessig zu erkennen
1096+
decompressed = _gz.read(read_limit + 1)
1097+
except (OSError, EOFError, MemoryError) as _gz_err:
9771098
print(f" FEHLER gzip-Dekomprimierung {url}: {_gz_err}")
9781099
return None
979-
if len(data) > read_limit:
980-
print(f" WARNUNG {url}: gzip-dekomprimiert > {read_limit} "
981-
f"bytes – getrimmt (mögliche zip-bomb)")
982-
data = data[:read_limit]
1100+
if len(decompressed) > read_limit:
1101+
# Streaming-Limit erreicht: behandle wie zip-bomb –
1102+
# KEINE getrimmte Auslieferung wie bei nicht-gzip
1103+
# Truncation, weil eine teilweise dekomprimierte
1104+
# Datei stark verzerrt sein kann (mitten im Eintrag
1105+
# abgeschnitten, oder kuenstlich aufgeblaeht durch
1106+
# einen boesartigen Stream). Lieber abbrechen.
1107+
print(f" FEHLER {url}: gzip-Stream > {read_limit} "
1108+
f"bytes nach Dekomprimierung – moegliche "
1109+
f"zip-bomb, Fetch verworfen")
1110+
return None
1111+
data = decompressed
9831112
return data.decode("utf-8", errors="ignore")
9841113
except urllib.error.HTTPError as e:
9851114
retryable = e.code in TRANSIENT_CODES or (e.code == 404 and _host_is_gh_raw)
@@ -1014,8 +1143,10 @@ def redirect_request(self, req, fp, code, msg, headers, newurl):
10141143
finally:
10151144
# FIX DNS-REBIND: Pin-Mapping wieder entfernen damit spaetere
10161145
# Aufrufe mit anderen URLs nicht gestale IPs bekommen.
1017-
for h in pinned_hosts:
1018-
_unpin_host(h)
1146+
# FIX BUG-PIN-RESTORE: Restore in umgekehrter Reihenfolge des Pinnens
1147+
# (LIFO), damit verschachtelte Pins korrekt aufgeloest werden.
1148+
for h, previous in reversed(pinned_hosts):
1149+
_restore_pin(h, previous)
10191150

10201151

10211152
# ═══════════════════════════════════════════════════════════════

0 commit comments

Comments
 (0)