Skip to content

Workflow Health Checker #424

Workflow Health Checker

Workflow Health Checker #424

name: Workflow Health Checker
on:
schedule:
- cron: '15 1 * * *' # 01:15 UTC (15 Min nach Feed Health Monitor 01:00)
- cron: '15 7 * * *' # 07:15 UTC (nach Score Decay 07:00; VOR Geo-Tagger 07:45 – Geo-Status erst ab 13:15 sichtbar)
- cron: '15 13 * * *' # 13:15 UTC (Tagesmitte – Pipeline-Stillstand erkennen)
- cron: '15 19 * * *' # 19:15 UTC (Abend – vor HoneyDB 22:15 + Combined 00:00)
workflow_dispatch:
env:
FORCE_JAVASCRIPT_ACTIONS_TO_NODE24: true
permissions:
contents: write
actions: read
issues: none
packages: none
pull-requests: none
security-events: none
concurrency:
group: ${{ github.workflow }}
cancel-in-progress: false
jobs:
check-python-code:
runs-on: ubuntu-latest
timeout-minutes: 20
steps:
- name: Checkout Repository
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
with:
persist-credentials: true
- name: Check Python Code in all Workflow Files
env:
# FIX #10: Workflow-Dateiname aus github-Kontext ableiten statt hartcodiert.
# Verhindert stilles Fehlverhalten bei Umbenennung der Datei.
WORKFLOW_SELF: ${{ github.workflow_ref }}
run: |
python3 << 'EOF'
import sys as _sys; _sys.path.insert(0, "scripts")
from netshield_common import (
load_whitelist, load_fp_set, is_in_fp_set,
is_valid_public_ipv4, is_valid_public_cidr,
is_protected_entry, is_whitelisted,
parse_entries as _parse_entries, calculate_confidence,
safe_get_date, parse_date, sort_ips, write_ip_list,
write_json_atomic, write_text_atomic,
fetch_url, check_local_feed_age,
IPV4_RE, CIDR_RE, TIMESTAMP_RE,
)
# Init: Whitelist + FP-Set laden
load_whitelist()
load_fp_set()
import re, textwrap, os, json, ast, sys, shlex, ipaddress
from datetime import datetime, timezone
now = datetime.now(timezone.utc)
now_str = now.strftime("%Y-%m-%d %H:%M UTC")
WORKFLOWS_DIR = ".github/workflows"
if not os.path.isdir(WORKFLOWS_DIR):
print(f"WARNUNG: {WORKFLOWS_DIR} nicht gefunden – kein Fallback auf Root.")
write_text_atomic("workflow_health_report.md", "# Workflow Health Checker\n\n*Workflows-Verzeichnis nicht gefunden.*\n")
exit(0)
files = sorted(f for f in os.listdir(WORKFLOWS_DIR) if f.endswith(".yml"))
print(f"Workflows gefunden: {len(files)}\n")
errors = []
warnings = []
results = {}
# ── Check 1: Cron-Kollisionen & doppelte Namen ────────────────────
# Cron-Regex matcht nur echte Cron-Expressions (Ziffern, *, /, ,)
# und explizit NICHT den eigenen Dateinamen (verhindert Self-Match)
# FIX #10: SELF aus WORKFLOW_SELF-Env-Var ableiten.
# Format von github.workflow_ref: "owner/repo/.github/workflows/name.yml@refs/heads/main"
import os as _os
_wf_ref = _os.environ.get("WORKFLOW_SELF", "")
_wf_part = _wf_ref.split(".github/workflows/")[-1].split("@")[0].strip()
SELF = _wf_part if _wf_part.endswith(".yml") else "workflow_health_checker.yml"
print(f"SELF erkannt als: {SELF}")
required_protection = {
"update_combined_blacklist.yml": ["is_protected_entry", "whitelist.json"],
"update_confidence_blacklist.yml": ["is_protected_entry", "whitelist.json"], # FIX: SSOT-Migration wie update_combined_blacklist.yml
# FIX: false_positive_checker.yml lädt Whitelist aus whitelist.json.
# Literal-CIDRs stehen nicht mehr inline → Token-Check auf Dateiname.
"false_positive_checker.yml": ["whitelist.json", "is_whitelisted"],
}
cron_re = re.compile(r"cron:\s*'([\d\s*/,]+)'")
cron_map = {}
name_map = {}
def _expand_simple_field(field, min_v, max_v):
field = field.strip()
if field == "*":
return list(range(min_v, max_v + 1))
vals = set()
for part in field.split(','):
part = part.strip()
if not part:
continue
if part.startswith('*/'):
step = int(part[2:])
vals.update(range(min_v, max_v + 1, step))
elif part.isdigit():
v = int(part)
if min_v <= v <= max_v:
vals.add(v)
return sorted(vals)
def _cron_minutes_of_day(expr):
parts = expr.split()
if len(parts) != 5:
return []
minute_f, hour_f, dom_f, month_f, dow_f = parts
# Weekly-Crons nicht stillschweigend verwerfen:
# Für den Puffer-Check genügt eine konservative Tageszeit-Betrachtung.
# month_f bleibt eingeschränkt; dom_f / dow_f werden bewusst ignoriert.
if month_f != '*':
return []
minutes = _expand_simple_field(minute_f, 0, 59)
hours = _expand_simple_field(hour_f, 0, 23)
return sorted({h * 60 + m for h in hours for m in minutes})
def _fmt_minute_of_day(v):
return f"{v // 60:02d}:{v % 60:02d} UTC"
def _code_only(line):
return line.split('#', 1)[0].rstrip()
def _extract_git_added_files(shell_text):
added = []
for raw_line in shell_text.splitlines():
line = _code_only(raw_line).strip()
if not line.startswith("git add "):
continue
try:
parts = shlex.split(line)
except Exception:
parts = line.split()
for token in parts[2:]:
if token in {"2>/dev/null", "||", "true"}:
continue
if token.startswith("-"):
continue
added.append(token)
return added
def _report_like_paths(paths):
hits = []
for path in paths:
p = path.strip().strip("\\")
low = p.lower()
if any(tok in low for tok in [
"report", "status", "result", "summary", ".md", ".json"
]):
hits.append(p)
return hits
def _find_guard_exits(dedented):
guard_exits = []
lines = dedented.splitlines()
for idx, raw_line in enumerate(lines):
line = _code_only(raw_line).strip()
if not line:
continue
exit_match = re.search(r'(?:sys\.exit|raise\s+SystemExit)\((\d+)\)', line)
if not exit_match:
continue
context_lines = [_code_only(l) for l in lines[max(0, idx-14):idx+1]]
context = " ".join(x for x in context_lines if x.strip())
min_vars = sorted(set(re.findall(r'\bMIN_[A-Z0-9_]+\b', context)))
if not min_vars:
continue
guard_exits.append({
"line": idx,
"exit_code": int(exit_match.group(1)),
"mins": min_vars,
"context": context,
"lines_before": lines[:idx],
})
return guard_exits
def _has_prior_report_write(lines_before, report_paths):
if not report_paths:
return True
prior = "\n".join(lines_before)
# Variablen-Aliases ermitteln: z.B. REPORT_FILE = "bot_detector_report.md"
# → damit wird auch "with open(REPORT_FILE, 'w')" als Prior-Write erkannt
# wenn der resolved Wert in report_paths enthalten ist.
var_aliases = set()
for raw_line in lines_before:
line = _code_only(raw_line).strip()
for m in re.finditer(r'\b([A-Z_][A-Z0-9_]*)\s*=\s*(["\'])([^"\']+)\2', line):
if m.group(3).strip() in report_paths:
var_aliases.add(m.group(1))
# FIX CHK21: Atomic-Writer aus netshield_common ebenfalls als
# Prior-Write anerkennen. Die Codebasis schreibt Reports/Status-
# Dateien ausschliesslich ueber write_text_atomic / write_json_atomic
# (Crash-Sicherheit). Ohne diese Erweiterung wurde jeder guard-exit
# hinter einem atomaren Write faelschlicherweise als "stale Report
# moeglich" gemeldet (z.B. auto_feed_discovery, honeydb, honeypot,
# geo_tagger, cve_to_ip_mapper).
atomic_writers = r'(?:write_text_atomic|write_json_atomic|write_ip_list)'
for report_path in report_paths:
var_match = re.fullmatch(r'[A-Z_][A-Z0-9_]*', report_path)
if var_match:
pat = rf'with open\({re.escape(report_path)}\s*,\s*["\']w["\']'
atomic_pat = rf'{atomic_writers}\(\s*{re.escape(report_path)}\s*,'
else:
pat = rf'with open\((?:r?f?["\'])[^\n]*{re.escape(report_path)}[^\n]*["\']\s*,\s*["\']w["\']'
atomic_pat = rf'{atomic_writers}\(\s*(?:r|f|rf|fr)?["\'][^"\']*{re.escape(report_path)}[^"\']*["\']\s*,'
if re.search(pat, prior):
return True
if re.search(atomic_pat, prior):
return True
# Zusatz: auch über Variablen-Alias suchen (REPORT_FILE → "bot_detector_report.md")
for alias in var_aliases:
if re.search(rf'with open\({re.escape(alias)}\s*,\s*["\']w["\']', prior):
return True
if re.search(rf'{atomic_writers}\(\s*{re.escape(alias)}\s*,', prior):
return True
return False
def _extract_report_targets_from_block(dedented):
targets = set()
var_values = {}
lines = dedented.splitlines()
for raw_line in lines:
line = _code_only(raw_line).strip()
if not line:
continue
for m in re.finditer(r'\b([A-Z_][A-Z0-9_]*)\s*=\s*(["\'])([^"\']+)\2', line):
value = m.group(3).strip()
low = value.lower()
if any(tok in low for tok in ["report", "status", "result", "summary", ".md", ".json"]):
var_values[m.group(1)] = value
open_match = re.search(r'with\s+open\((.+?),\s*["\']w["\']', line)
# FIX CHK21: Auch atomare Writer aus netshield_common erfassen.
# Ohne diese Ergaenzung blieb die Target-Liste bei vielen
# Workflows leer, obwohl Report-Dateien committet werden.
atomic_match = re.search(
r'\b(?:write_text_atomic|write_json_atomic|write_ip_list)\(\s*(.+?)\s*(?:,|\))',
line,
)
match = open_match or atomic_match
if not match:
continue
raw_target = match.group(1).strip()
if re.fullmatch(r'[A-Z_][A-Z0-9_]*', raw_target):
resolved = var_values.get(raw_target)
if resolved:
# Variablenname bekannt und aufgelöst → resolved Wert nehmen
targets.add(resolved)
else:
# Variablenname unbekannt → nur aufnehmen wenn Name selbst
# auf Report-/Status-Datei hindeutet (REPORT_FILE, STATUS_FILE).
# OUT_FILE, BLACKLIST_FILE u.ä. werden so korrekt ausgeschlossen.
low_var = raw_target.lower()
if any(tok in low_var for tok in ["report", "status", "result", "summary"]):
targets.add(raw_target)
continue
lit = re.match(r'(?:r|f|rf|fr)?["\']([^"\']+)["\']$', raw_target)
if lit:
value = lit.group(1).strip()
low = value.lower()
if any(tok in low for tok in ["report", "status", "result", "summary", ".md", ".json"]):
targets.add(value)
return sorted(targets)
def _extract_report_targets_from_blocks(blocks):
targets = set()
for blk in blocks:
targets.update(_extract_report_targets_from_block(textwrap.dedent(blk)))
return sorted(targets)
def _has_commit_step_always(content):
lines = content.splitlines()
for idx, raw_line in enumerate(lines):
if re.match(r'^\s*-\s+name:\s+.*commit', raw_line, re.I):
window = "\n".join(lines[idx:idx+8])
if "if: always()" in window:
return True
return "if: always()" in content
expected_sequence = [
("feed_health_monitor.yml", "workflow_health_checker.yml", 15),
("honeydb_monitor.yml", "update_bot_detector.yml", 15),
("update_bot_detector.yml", "honeypot_monitor.yml", 15),
("honeypot_monitor.yml", "update_combined_blacklist.yml", 60),
("update_combined_blacklist.yml", "update_confidence_blacklist.yml", 45),
("score_decay_monitor.yml", "geo_tagger.yml", 45),
]
continue_on_error_allowlist = {
"update_combined_blacklist.yml", # AbuseIPDB Rate-Limit ist absichtlich non-blocking
}
for fname in files:
content = open(f"{WORKFLOWS_DIR}/{fname}").read()
if fname in required_protection:
missing = [token for token in required_protection[fname] if token not in content]
if missing:
errors.append({
"file": fname,
"check": "Hardening-Guard fehlt",
"detail": f"Fehlende Schutz-Tokens: {', '.join(missing)}",
})
for cron in cron_re.findall(content):
cron = cron.strip()
cron_map.setdefault(cron, []).append(fname)
m = re.search(r"^name:\s*(.+)", content, re.M)
if m:
name_map.setdefault(m.group(1).strip(), []).append(fname)
for cron, wfs in cron_map.items():
if len(wfs) > 1:
warnings.append({
"file": " + ".join(wfs),
"check": "Cron-Kollision",
"detail": f"'{cron}' – beide Workflows laufen gleichzeitig",
})
for name, wfs in name_map.items():
if len(wfs) > 1:
errors.append({
"file": " + ".join(wfs),
"check": "Doppelter Workflow-Name",
"detail": f'name: "{name}" – concurrency-group schlägt fehl, Runs blockieren sich',
})
# ── Check 1b: Erwartete Workflow-Reihenfolge / Mindestpuffer ─────
file_crons = {fname: cron_re.findall(open(f"{WORKFLOWS_DIR}/{fname}").read()) for fname in files}
for _src, _dst, _min_gap in expected_sequence:
src_minutes = []
dst_minutes = []
for _expr in file_crons.get(_src, []):
src_minutes.extend(_cron_minutes_of_day(_expr.strip()))
for _expr in file_crons.get(_dst, []):
dst_minutes.extend(_cron_minutes_of_day(_expr.strip()))
src_minutes = sorted(set(src_minutes))
dst_minutes = sorted(set(dst_minutes))
if not src_minutes or not dst_minutes:
continue
bad_pairs = []
for _s in src_minutes:
# FIX BUG-5: >= statt >, gleiches Pattern wie unten in der
# _dependency_chain-Schleife. Der separate Cron-Kollisions-Check
# (Z. ~331) sieht nur exakt gleiche Cron-Strings als Kollision —
# '0 */6' und '0 */3' sind aber unterschiedliche Strings, die
# sich an Minute 0/360/720/1080 ueberlappen. Ohne >= wuerde
# diese Schleife solche Δ=0-Kollisionen ebenfalls verfehlen.
later = [_d for _d in dst_minutes if _d >= _s]
if later:
_next = later[0]
if (_next - _s) < _min_gap:
bad_pairs.append(f"{_fmt_minute_of_day(_s)} → {_fmt_minute_of_day(_next)} ({_next - _s}min < {_min_gap}min)")
if bad_pairs:
warnings.append({
"file": f"{_src} → {_dst}",
"check": "Workflow-Reihenfolge / Puffer zu knapp",
"detail": "; ".join(bad_pairs[:5]),
})
# ── Pro Datei ─────────────────────────────────────────────────────
for fname in files:
# Eigene Datei nicht prüfen: der Health Checker läuft innerhalb des Runners
# und kann sich nicht selbst syntaxprüfen ohne False Positives zu erzeugen.
if fname == SELF:
results[fname] = {"blocks": 0, "errors": [], "warnings": [], "cron": []}
print(f"⏭ {fname}: übersprungen (eigene Datei)")
continue
path = f"{WORKFLOWS_DIR}/{fname}"
content = open(path).read()
file_errors = []
file_warnings = []
# Python-Blöcke aus YAML-Heredocs extrahieren
# FIX CHK-PYEOF: auch python3 - << 'PYEOF' Blöcke erkennen
# (z.B. seen_db_meta.json Writer in update_combined_blacklist.yml)
blocks = re.findall(r"python3 << 'EOF'(.*?)\n\s*EOF\b", content, re.DOTALL)
blocks += re.findall(r"python3 - << 'PYEOF'(.*?)\n\s*PYEOF\b", content, re.DOTALL)
for i, block in enumerate(blocks):
dedented = textwrap.dedent(block)
# ── Check 2: Python-Syntax ─────────────────────────────────
try:
compile(dedented, f"{fname}:block{i}", "exec", ast.PyCF_ONLY_AST)
except SyntaxError as e:
msg = f"Block {i} Zeile {e.lineno}: SyntaxError – {e.msg} → `{(e.text or '').strip()}`"
file_errors.append(msg)
errors.append({"file": fname, "check": "SyntaxError", "detail": msg})
continue
# ── Check 3: sys.exit ohne sys importiert ─────────────────
# Erkennt auch Multi-Imports: "import os, re, sys"
sys_imported = bool(re.search(r'\bimport\b[^#\n]*\bsys\b', dedented))
if "sys.exit" in dedented and not sys_imported:
msg = f"Block {i}: sys.exit() verwendet aber sys nicht importiert → NameError"
file_errors.append(msg)
errors.append({"file": fname, "check": "Fehlender Import", "detail": msg})
# ── Check 4: open(..., "w") ohne encoding ─────────────────
# Nur für text-writes (f.write), nicht für json.dump (schreibt nur ASCII)
# FIX CHK4: Kommentarzeilen ausschließen – open() in # ... Kommentaren
# sind kein echter Code und dürfen keinen False Positive erzeugen.
_dedented_code = "\n".join(
l for l in dedented.splitlines()
if not l.lstrip().startswith("#")
)
for m in re.finditer(r'open\([^)]+,\s*["\']w["\']\s*\)', _dedented_code):
# FIX CHK4b: Zeilen wie msg=f"...open(...,'w')..." sind String-Literale, kein echter open()-Aufruf.
_ml = _dedented_code[_dedented_code.rfind('\n', 0, m.start())+1:]
_ml = _ml[:_ml.find('\n')] if '\n' in _ml else _ml
if re.match(r'\s*(?:msg|_msg|file_\w+)\s*', _ml):
continue
snippet = _dedented_code[max(0, m.start()-30):m.end()+60]
if "encoding" not in snippet and "json.dump" not in snippet:
msg = f"Block {i}: open(..., 'w') ohne encoding= – UnicodeEncodeError möglich"
file_warnings.append(msg)
warnings.append({"file": fname, "check": "Encoding fehlt", "detail": msg})
break
# ── Check 5: json.load ohne try/except ────────────────────
if "json.load(" in dedented and "try:" not in dedented:
msg = f"Block {i}: json.load() ohne try/except – crash bei korrupter Datei"
file_warnings.append(msg)
warnings.append({"file": fname, "check": "Fehlende Fehlerbehandlung", "detail": msg})
# ── Check 6: del in aktiver for-Schleife (Dict-Mutation) ──
# FIX CHK6d: Variablen-abhängiger Match.
# Nur melden, wenn DIESELBE Variable in for-Schleife iteriert UND
# via `del` mutiert wird. Vorher wurde jede unsichere for-Iteration
# (inkl. harmloser Listen) mit jedem beliebigen `del x[...]` im
# selben Block als Fehler gemeldet – False Positive bei z.B.
# `for e in _WHITELIST_ENTRIES:` (Liste) + `del other_dict[k]`.
# "for x in list(d.keys())" – SICHER → kein Match
# "for x in list(d)" – SICHER → kein Match
# "for x in d.keys()" + "del d[k]" – UNSICHER → Fehler
# "for x in d" + "del d[k]" – UNSICHER → Fehler
# "for x in other" + "del d[k]" – SICHER → kein Match (verschiedene Namen)
code_lines = [l for l in dedented.splitlines()
if l.strip() and not l.strip().startswith('#')]
code_only = '\n'.join(code_lines)
_unsafe_for_matches = re.findall(
r'^\s*for \w+ in (?!list\()([A-Za-z_]\w*)(?:\.keys\(\))?',
code_only, re.MULTILINE
)
_del_targets = set(re.findall(r'\bdel (\w+)\[', code_only))
_conflicts = [v for v in _unsafe_for_matches if v in _del_targets]
if _conflicts:
msg = (f"Block {i}: 'del' in for-Schleife ohne list()-Kopie "
f"({_conflicts[0]}) – RuntimeError bei Dict-Größenänderung")
file_errors.append(msg)
errors.append({"file": fname, "check": "Dict-Mutation in Schleife", "detail": msg})
# ── Check 7: ThreadPoolExecutor ohne timeout ───────────────
for m in re.finditer(r'as_completed\(futures\)', dedented):
snippet = dedented[m.start():m.start()+60]
if "timeout=" not in snippet:
msg = f"Block {i}: as_completed(futures) ohne timeout= – hängt bei totem Feed ewig"
file_warnings.append(msg)
warnings.append({"file": fname, "check": "as_completed ohne timeout", "detail": msg})
break
# ── Check 8: git push ohne Retry-Schleife (Shell-Teil) ───────────
# Korrekte Implementierung: for-Schleife mit mehreren Versuchen + git push darin.
# Falsch-Positiv-Schutz: "rebase" allein reicht nicht –
# "git pull --rebase && git push" ohne Schleife ist genauso fehleranfällig.
#
# FIX False Positive: Python-Heredocs (python3 << 'EOF'...EOF) vor der
# Shell-Suche entfernen – sonst triggert "git push" als Python-String-
# Literal (z.B. if "git push" in part) fälschlicherweise den Check.
content_shell_only = re.sub(
r"python3 << 'EOF'.*?\n\s*EOF\b", "", content, flags=re.DOTALL
)
# FIX CHK26: Kommentare (YAML '#...' und Shell '#...') entfernen,
# sonst triggert z.B. ein Kommentar "# nicht per git push, sondern
# via API" den persist-credentials-Check (dependabot-auto-merge.yml).
# Hinweis: String-Literale mit '#' koennen ausnahmsweise ebenfalls
# gestripped werden – fuer 'git push'/'git commit'-Suche akzeptabel,
# da diese Kommandos nicht als String-Literal sinnvoll erscheinen.
content_shell_no_comments = re.sub(
r"(^|\s)#[^\n]*", r"\1", content_shell_only
)
shell_blocks = re.findall(
r"run: \|(.*?)(?=\n - name:|\n [a-z]|\Z)",
content_shell_only, re.DOTALL
)
for part in shell_blocks:
if "git push" in part:
has_retry_loop = bool(re.search(r'for attempt in \d', part))
if not has_retry_loop:
msg = "git push ohne Retry-Schleife – Race-Condition bei parallelen Runs (kein 'for attempt in ...')"
file_warnings.append(msg)
warnings.append({"file": fname, "check": "Git Push ohne Retry-Schleife", "detail": msg})
break
# ── Check 9: concurrency fehlt bei scheduled Workflow ─────────
if "schedule" in content and "concurrency:" not in content:
msg = "Kein concurrency-Block – parallele Runs möglich bei manuell + scheduled gleichzeitig"
file_warnings.append(msg)
warnings.append({"file": fname, "check": "Concurrency fehlt", "detail": msg})
# ── Check 10: sys.exit(0) im Leerungsschutz ──────────────────
# Leerungsschutz-Exits müssen sys.exit(1) verwenden damit GitHub
# Actions den Step als fehlgeschlagen markiert. exit(0) versteckt
# das Problem still.
# AUSNAHME: exit(0) ist korrekt bei Feed-Netzwerkfehlern
# ("nicht erreichbar"), User-Fehlern (community_ip_report) und
# fehlenden Konfigurationsdateien.
_leer_kws = {"MIN_ENTRIES", "MIN_ABUSEIPDB", "Leerungsschutz"}
_ok_kws = {"nicht erreichbar", "Versuchen fehlgeschlagen",
"RATE_LIMITED", "Rate-Limit", # Rate-Limit: intentionales exit(0) mit continue-on-error (z.B. AbuseIPDB)
"NO_IP_FOUND", "INVALID_IP",
"WORKFLOWS_DIR", "korrupt", "nicht lesbar",
"FP-Liste", "False-Positive"}
for _bi, _blk in enumerate(blocks):
_lines = _blk.splitlines()
for _j, _ln in enumerate(_lines):
_s = _ln.strip()
if _s.startswith("#"):
continue
if "sys.exit(0)" in _s or _s == "exit(0)":
_ctx = " ".join(_lines[max(0,_j-8):_j])
_is_leer = any(kw in _ctx for kw in _leer_kws)
_is_ok = any(kw in _ctx for kw in _ok_kws)
if _is_leer and not _is_ok:
# FIX CHK10b: exit(0) mit "continue-on-error"-Kommentar
# in derselben Zeile ist intentionell – kein Fehler melden.
_exit0_ctx = _lines[_j] if _j < len(_lines) else ""
if "continue-on-error" in _exit0_ctx:
continue
msg = (f"Block {_bi} L~{_j}: sys.exit(0) im Leerungsschutz – "
f"soll sys.exit(1) sein damit GH-UI Fehler anzeigt")
file_errors.append(msg)
errors.append({"file": fname, "check": "exit(0) im Leerungsschutz",
"detail": msg})
# ── Check 11: ::warning Annotation vor Leerungsschutz-exit ───
# Jeder Leerungsschutz-Exit sollte vorher eine GH-Annotation
# print(f"::warning file=...::...") ausgeben damit das Problem
# im GitHub-Actions-UI sichtbar wird ohne den Log zu öffnen.
_guarded_files = {
"honeydb_monitor.yml", "honeypot_monitor.yml",
"update_bot_detector.yml", "update-blocklist.yml",
"update_combined_blacklist.yml", "geo_tagger.yml",
"asn_reputation_scorer.yml", "cve_to_ip_mapper.yml",
"update_confidence_blacklist.yml", "false_positive_checker.yml",
"auto_feed_discovery.yml", "feed_health_monitor.yml",
}
if fname in _guarded_files:
has_warning_annot = f"::warning file={fname}::" in content
has_leer = any(kw in content for kw in _leer_kws)
if has_leer and not has_warning_annot:
msg = (f"Leerungsschutz vorhanden aber keine "
f"'::warning file={fname}::' Annotation – "
f"Ausfall nur im Job-Log sichtbar")
file_warnings.append(msg)
warnings.append({"file": fname,
"check": "GH Warning Annotation fehlt",
"detail": msg})
# ── Check 12: false_positive_checker Cache-Key ────────────────
# Der FP-Checker muss seinen Cache-Save in einen eigenen Slot
# (netshield-seen-db-fp-) schreiben, NICHT in v2. Andernfalls
# überschreibt er den frisch generierten Combined-Cache und
# erzeugt eine Race-Condition.
if fname == "false_positive_checker.yml":
# FIX CHK12: Save-Key des FP-Checkers muss den fp-Prefix enthalten,
# sonst ueberschreibt er den Combined-Cache (Race-Condition).
# Alle Cache-Save-Bloecke extrahieren und Key auf fp-Prefix pruefen.
save_blocks = re.findall(
r'- name: Save seen_db Cache.*?key:\s*([^\n]+)',
content, re.DOTALL
)
for _key in save_blocks:
_key = _key.strip()
if "netshield-seen-db-" in _key and "fp-" not in _key:
msg = (f"Cache Save-Key verwendet nicht den fp-Prefix "
f"– FP-Checker wuerde Combined-Cache ueberschreiben "
f"(Race-Condition): {_key}")
file_errors.append(msg)
errors.append({"file": fname,
"check": "Cache-Key Race-Condition",
"detail": msg})
# ── Check 13: MIN_ENTRIES-Schwellen zu niedrig ────────────────
# ── Check 13: Mindestschwellen / Leerungsschutz-Thresholds ───
# Prüft bekannte Guard-Variablen pro Workflow auf zu niedrige Werte.
_threshold_specs = {
"honeydb_monitor.yml": {
"MIN_ENTRIES": 500,
},
"update-blocklist.yml": {
"MIN_ENTRIES": 240_000,
},
"cve_to_ip_mapper.yml": {
"MIN_CVE_ENTRIES": 100,
},
"honeypot_monitor.yml": {
"MIN_ENTRIES": 500,
},
"asn_reputation_scorer.yml": {
"MIN_ASN_RESULTS": 5,
},
"geo_tagger.yml": {
"MIN_GEO_MATCHED": 100,
},
"update_bot_detector.yml": {
"MIN_ENTRIES": 50,
},
"update_combined_blacklist.yml": {
"MIN_ABUSEIPDB": 100,
},
"auto_feed_discovery.yml": {
"MIN_APPROVED_FEEDS": 10,
},
}
if fname in _threshold_specs:
for _var, _expected in _threshold_specs[fname].items():
_found = False
_rx = re.compile(rf'\b{re.escape(_var)}\s*=\s*([\d_]+)')
for _m in _rx.finditer(content):
_found = True
_val = int(_m.group(1).replace("_", ""))
if _val < _expected:
msg = (f"{_var}={_val:,} ist zu niedrig "
f"(Minimum für {fname}: {_expected:,})")
file_warnings.append(msg)
warnings.append({"file": fname,
"check": f"{_var} zu niedrig",
"detail": msg})
if not _found:
msg = (f"{_var} nicht gefunden – "
f"Leerungsschutz-Regressionscheck unvollständig")
file_warnings.append(msg)
warnings.append({"file": fname,
"check": f"{_var} fehlt",
"detail": msg})
# ── Check 14: continue-on-error sichtbar machen ──────────────
if "continue-on-error: true" in content and fname not in continue_on_error_allowlist:
msg = ("continue-on-error=true gesetzt – Fehler können still bleiben; "
"Review ob das wirklich beabsichtigt ist")
file_warnings.append(msg)
warnings.append({"file": fname,
"check": "continue-on-error aktiv",
"detail": msg})
# ── Check 15: Externer Feed ohne expliziten Leerungsschutz ───
# Heuristik: Workflow lädt externe URLs und schreibt Listen/Reports,
# aber enthält keine MIN_*-Schwelle. Das ist kein harter Fehler,
# sondern ein Hinweis auf mögliche Datenqualitäts-Lücken.
_has_external_fetch = (
"urllib.request.urlopen" in content
or "https://" in content
or "http://" in content
)
_writes_outputs = any(tok in content for tok in [
".txt", ".md", "OUT_FILE", "REPORT_FILE"
])
_has_min_guard = bool(re.search(r'\bMIN_[A-Z0-9_]+\b', content))
_quality_guard_exempt = {
"feed_health_monitor.yml",
"workflow_health_checker.yml",
"netshield_report_generator.yml",
"community_ip_report.yml",
"auto_feed_discovery.yml", # hat MIN_APPROVED_FEEDS guard
}
if _has_external_fetch and _writes_outputs and not _has_min_guard and fname not in _quality_guard_exempt:
msg = ("Externer Feed/Report ohne explizite MIN_* Guard-Variable – "
"Müll-/Leer-Daten könnten unbemerkt akzeptiert werden")
file_warnings.append(msg)
warnings.append({"file": fname,
"check": "Kein expliziter Leerungsschutz",
"detail": msg})
# ── Check 16: urllib.urlopen ohne timeout= ───────────────────
# urlopen ohne timeout= hängt unendlich bei toten Feeds / Netzwerkproblemen.
# Jeder externe HTTP-Aufruf MUSS einen expliziten timeout haben.
for _bi, _blk in enumerate(blocks):
for _m in re.finditer(r'urlopen\(([^)]+)\)', _blk):
_args = _m.group(1)
if 'timeout=' not in _args:
msg = (f"Block {_bi}: urllib.urlopen() ohne timeout= – "
f"hängt ewig bei totem Host/Netzwerkausfall")
file_warnings.append(msg)
warnings.append({"file": fname,
"check": "urlopen ohne timeout",
"detail": msg})
break
# ── Check 17: open(..., "a") ohne encoding= ──────────────────
# Append-Mode kann ebenso wie Write-Mode zum UnicodeEncodeError
# führen wenn Locale nicht UTF-8 ist.
# FIX CHK17: Kommentarzeilen ausschließen (analog Fix CHK4).
_open_append_pat = re.compile(r"open\([^)]+,\s*['\"]{1}a['\"]{1}\s*\)")
for _bi, _blk in enumerate(blocks):
_blk_code = "\n".join(
l for l in _blk.splitlines()
if not l.lstrip().startswith("#")
)
for _m in _open_append_pat.finditer(_blk_code):
# FIX CHK17b: String-Literale in msg=f"...open(...,'a')..."-Zeilen ausschließen.
_aml = _blk_code[_blk_code.rfind('\n', 0, _m.start())+1:]
_aml = _aml[:_aml.find('\n')] if '\n' in _aml else _aml
if re.match(r'\s*(?:msg|_msg|file_\w+)\s*', _aml):
continue
_snip = _blk_code[max(0, _m.start()-30):_m.end()+60]
if 'encoding' not in _snip:
msg = (f"Block {_bi}: open(..., 'a') ohne encoding= – "
f"UnicodeEncodeError möglich bei non-ASCII")
file_warnings.append(msg)
warnings.append({"file": fname,
"check": "Encoding fehlt (append)",
"detail": msg})
break
# ── Check 18: sys.exit() ohne vorherige Fehlermeldung ────────
# FIX CHK18b: Fenster auf 8 Zeilen erweitert (war 5 – zu eng bei
# mehrzeiligen open()-Blöcken vor sys.exit). Nur echte Code-Zeilen
# (keine Kommentare) im Kontext-Fenster zählen.
for _bi, _blk in enumerate(blocks):
_lines = _blk.splitlines()
for _j, _ln in enumerate(_lines):
_s = _ln.strip()
if _s.startswith('#'):
continue
# FIX CHK18c: nur echte sys.exit(1)-Statements, keine String-Literale.
# Zeile muss nach sys.exit(1) enden (ggf. Kommentar), kein weiterer Kontext.
# FIX CHK18d: msg=f"...sys.exit(1)..."-Zeilen sind String-Literale, kein echter Exit.
if re.match(r'\s*(?:msg|_msg|file_\w+)\s*(?:=|\()', _s): continue
if re.search(r'sys\.exit\(1\)\s*(?:#.*)?$', _s):
# Prüfe die 8 Zeilen davor (nur Code, keine Kommentare)
_ctx_lines = [l for l in _lines[max(0, _j-8):_j]
if l.strip() and not l.strip().startswith('#')]
_ctx = ' '.join(_ctx_lines)
if 'print(' not in _ctx and '::warning' not in _ctx and '::error' not in _ctx:
msg = (f"Block {_bi} L~{_j}: sys.exit(1) ohne vorherige "
f"Fehlermeldung (print/annotation) – schwer debuggbar")
file_warnings.append(msg)
warnings.append({"file": fname,
"check": "Stiller sys.exit(1)",
"detail": msg})
break
# ── Check 19: Commit-Step Pattern A (unbedingte Push-Loop) ───
# Fehlerbild: "git diff --staged --quiet || git commit ..." und danach
# trotzdem immer fetch/rebase/push. Wenn gar nichts committet wurde,
# kann der Step bei GitHub-Störung unnötig mit exit(1) fehlschlagen.
for _si, _shell in enumerate(shell_blocks):
_shell_code = "\n".join(
_code_only(l) for l in _shell.splitlines()
if _code_only(l).strip()
)
_has_pattern_a = (
"git diff --staged --quiet || git commit" in _shell_code
and "for attempt in 1 2 3 4 5; do" in _shell_code
)
if _has_pattern_a:
msg = ("Commit-Step nutzt Pattern A – Push-Loop läuft auch ohne Commit. "
"Bevorzuge 'if git diff --staged --quiet; then echo ...; else git commit; push-loop; fi'")
file_warnings.append(msg)
warnings.append({"file": fname,
"check": "Commit-Step Pattern A",
"detail": msg})
break
# ── Check 20: MIN_*-Guards mit ungueltigen Schwellenwerten ─────
# Fuer neue Workflows ohne harte Baseline soll zumindest erkannt werden,
# wenn ein Guard versehentlich auf 0 gesetzt wurde.
for _m in re.finditer(r'\b(MIN_[A-Z0-9_]+)\s*=\s*([\d_]+)', content):
_var = _m.group(1)
_val = int(_m.group(2).replace("_", ""))
if _val <= 0:
msg = f"{_var}={_val} ist ungueltig – Guard muss > 0 sein"
file_errors.append(msg)
errors.append({"file": fname,
"check": f"{_var} ungueltig",
"detail": msg})
# ── Check 21: Guard-Exit vor Report/Status-Write ──────────────
# Wenn ein Workflow Report-/Status-Dateien committet und der Commit-Step
# mit if: always() laufen soll, muessen diese Dateien VOR sys.exit(1)
# geschrieben werden. Sonst bleibt ein stale Report im Repo stehen.
_shell_reports = []
for _shell in shell_blocks:
_shell_reports.extend(_report_like_paths(_extract_git_added_files(_shell)))
_shell_reports = sorted(set(_shell_reports))
_written_reports = _extract_report_targets_from_blocks(blocks)
_report_targets = sorted(set(_shell_reports + _written_reports))
_has_always_commit = _has_commit_step_always(content) and bool(_report_targets)
for _bi, _blk in enumerate(blocks):
_ded = textwrap.dedent(_blk)
for _guard in _find_guard_exits(_ded):
if _guard["exit_code"] != 1:
continue
if not _has_always_commit:
continue
if not _has_prior_report_write(_guard["lines_before"], _report_targets):
msg = (f"Block {_bi} L~{_guard['line']}: sys.exit(1) vor Report/Status-Write "
f"({', '.join(_report_targets[:4])}) – stale Report moeglich trotz Commit-Step mit if: always()")
file_errors.append(msg)
errors.append({"file": fname,
"check": "Report nach Guard-Exit",
"detail": msg})
break
# ── Check 22: Guard-Workflow mit Report-Dateien aber Commit nicht always ──
# Neue Workflows mit Guard + Report sollten den Commit-Step absichern,
# damit der Fehler-Report auch bei sys.exit(1) committet wird.
_has_guard_exit = any(
_guard["exit_code"] == 1
for _blk in blocks
for _guard in _find_guard_exits(textwrap.dedent(_blk))
)
if _has_guard_exit and _report_targets and not _has_commit_step_always(content):
msg = ("Workflow hat Guard + Report/Status-Dateien (geschrieben oder committed), "
"aber kein 'if: always()' am Commit-Step – Guard-Fail kann stale Reports hinterlassen")
file_warnings.append(msg)
warnings.append({"file": fname,
"check": "Commit-Step nicht always()",
"detail": msg})
# ── Check 23: gezielte Regression-Checks fuer neue Findings ─────
GH_EXPR_OPEN = "$" + "{{"
issue_expr = f"{GH_EXPR_OPEN} github.event.issue.number }}"
run_expr = f"{GH_EXPR_OPEN} github.run_id }}"
community_key = f"key: netshield-seen-db-community-{issue_expr}-{run_expr}"
community_restore_prefix = f"netshield-seen-db-community-{issue_expr}-"
if fname == "false_positive_checker.yml":
# FIX CHK23a / RACE5: Zwei gültige Commit-Patterns:
# (A) ALT: git checkout --theirs mit allen 3 FP-Dateien
# (B) NEU (FIX RACE5): git reset --hard + cp /tmp/ für Overwrite-Dateien
# + Dedup-Append für Log. Kein checkout --theirs mehr nötig.
_fp_files = ["false_positives_log.txt", "false_positive_report.md", "false_positives_set.json"]
_has_old_pattern = 'git checkout --theirs' in content
_has_new_pattern = 'git reset --hard' in content and '/tmp/local_fp_' in content
if _has_old_pattern:
# ALT: checkout --theirs muss alle 3 Dateien enthalten
_checkout_block = re.search(
r'git checkout --theirs.*?(?=\n\s*git add|\n\s*GIT_EDITOR|$)',
content, re.DOTALL
)
_checkout_text = _checkout_block.group(0) if _checkout_block else ""
_missing_fp = [f for f in _fp_files if f not in _checkout_text]
if _missing_fp:
msg = (f"git checkout --theirs im Commit-Step fehlen Dateien: "
f"{', '.join(_missing_fp)} – Regression im FP-Checker-Commit-Step")
file_errors.append(msg)
errors.append({"file": fname, "check": "Fehlende FP-Dateien im checkout", "detail": msg})
# Append-only Log muss vor Rebase gesichert werden
if '/tmp/local_fp_log_appends.txt' not in content:
msg = ("Append-only Datei (false_positives_log.txt) wird bei rebase --theirs "
"nicht gesichert – Log-Eintraege gehen bei parallelen Runs verloren "
"(Regression FIX-RACE4)")
file_errors.append(msg)
errors.append({"file": fname, "check": "Append-only Rebase-Schutz fehlt", "detail": msg})
elif _has_new_pattern:
# NEU (FIX RACE5): reset --hard + Backup/Restore
# Overwrite-Dateien müssen via cp gesichert werden
_missing_backup = [f for f in ["local_fp_report.md", "local_fp_set.json"]
if f not in content]
if _missing_backup:
msg = (f"reset --hard Pattern: Backup fehlt für {', '.join(_missing_backup)} "
f"– Overwrite-Dateien gehen bei Push-Konflikt verloren")
file_errors.append(msg)
errors.append({"file": fname, "check": "Fehlende FP-Backup im reset-Pattern", "detail": msg})
# Append-only Log muss gesichert werden
if '/tmp/local_fp_log_appends.txt' not in content:
msg = ("reset --hard Pattern: Append-only Log-Sicherung fehlt "
"(/tmp/local_fp_log_appends.txt)")
file_errors.append(msg)
errors.append({"file": fname, "check": "Append-only Log-Schutz fehlt", "detail": msg})
else:
# Weder altes noch neues Pattern – Commit-Step fehlt Konfliktstrategie
msg = ("Commit-Step hat weder checkout --theirs noch reset --hard Pattern "
"– Push-Konflikte werden nicht behandelt")
file_errors.append(msg)
errors.append({"file": fname, "check": "Fehlende Konfliktstrategie", "detail": msg})
if fname == "update-blocklist.yml":
# FIX: Negativer Lookahead schließt korrekte "-- theirs -- all_countries" Muster aus.
# Alter Regex: \s+ matchte auch den "--" Separator → immer True (False Positive).
if re.search(r'git checkout --theirs(?!\s+--\b)\s+all_countries_ipv4\.txt', content):
msg = "git checkout --theirs fuer all_countries_ipv4.txt ohne '--' – inkonsistent/fragil"
file_errors.append(msg)
errors.append({"file": fname, "check": "Fehlendes -- bei git checkout", "detail": msg})
# FIX CHK23-BL: SPOF-Check – bei Totalausfall (0 IPs) muss
# eine ::error Annotation ausgegeben werden, nicht nur ::warning.
if 'len(all_ips) == 0' not in content:
msg = (
"update-blocklist.yml hat keinen expliziten Null-IP-Guard (len(all_ips)==0) – "
"Totalausfall von hackinggate.com wuerde nur als Warnung erscheinen"
)
file_warnings.append(msg)
warnings.append({"file": fname, "check": "Null-IP SPOF-Guard fehlt", "detail": msg})
if fname == "community_ip_report.yml":
restore_section = content.split('restore-keys: |', 1)[1] if 'restore-keys: |' in content else ''
if community_key in content and community_restore_prefix not in restore_section:
msg = ("Community-Cache nutzt issue+run_id als Key, aber kein issue-spezifisches restore-prefix – "
"seen_db akkumuliert pro Issue nicht")
file_errors.append(msg)
errors.append({"file": fname, "check": "Community-Cache Restore-Key", "detail": msg})
# FIX CHK23-RACE3 / RACE5: Zwei gültige Commit-Patterns:
# (A) ALT: checkout --theirs + /tmp/local_log_appends.txt Sicherung
# (B) NEU (FIX RACE5): reset --hard + Dedup-Re-Apply aus /tmp/
_has_old_community = 'git checkout --theirs' in content
_has_new_community = 'git reset --hard' in content and '/tmp/local_log_appends.txt' in content
if _has_old_community and '/tmp/local_log_appends.txt' not in content:
msg = ("Append-only Dateien (community_reports_log.txt, community_reported_ips.txt) "
"werden bei rebase --theirs nicht gesichert – Datenverlust bei parallelen Issues "
"(Regression FIX-RACE3)")
file_errors.append(msg)
errors.append({"file": fname, "check": "Append-only Rebase-Schutz fehlt", "detail": msg})
elif not _has_old_community and not _has_new_community:
# Weder altes noch neues Pattern
if 'git push' in content:
msg = ("Commit-Step hat weder checkout --theirs noch reset --hard Pattern "
"– Push-Konflikte auf Append-only-Dateien werden nicht behandelt")
file_warnings.append(msg)
warnings.append({"file": fname, "check": "Fehlende Konfliktstrategie", "detail": msg})
# FIX CHK23-CRASH1: Regression-Check – Post Issue Comment Step
# muss if: always() haben und community_result.txt Existenz prüfen,
# damit User bei Python-Crash trotzdem Feedback im Issue bekommt.
if 'Post Issue Comment' in content:
# Suche den Step-Block
_pic_m = re.search(r'-\s+name:\s+Post Issue Comment.*?(?=\n\s+-\s+name:|\Z)', content, re.DOTALL)
if _pic_m:
_pic_block = _pic_m.group(0)
if 'if: always()' not in _pic_block:
msg = ("Post Issue Comment Step hat kein 'if: always()' – "
"User bekommt bei Python-Crash kein Feedback (Regression FIX-CRASH1)")
file_errors.append(msg)
errors.append({"file": fname, "check": "Comment-Step nicht always()", "detail": msg})
if 'INTERNAL_ERROR' not in _pic_block:
msg = ("Post Issue Comment Step hat keinen INTERNAL_ERROR-Branch – "
"fehlende community_result.txt erzeugt unkontrollierten Crash (Regression FIX-CRASH1)")
file_errors.append(msg)
errors.append({"file": fname, "check": "INTERNAL_ERROR Branch fehlt", "detail": msg})
if fname == "honeydb_monitor.yml":
if 'data.get("from_id") or data.get("next_from_id")' in content:
msg = ("Paging-Cursor nutzt boolean 'or' – Wert 0 ist falsy und kann auf das falsche Feld zurückfallen")
file_errors.append(msg)
errors.append({"file": fname, "check": "Unsicherer Paging-Cursor", "detail": msg})
if fname == "auto_feed_discovery.yml":
# set() + add() ist korrekt (keine Duplikate moeglich) → kein Fehler.
# Nur Liste + append() melden, da dort Duplikate auftreten koennen.
_uses_set = 'to_expire = set()' in content and 'to_expire.add(' in content
_uses_list = 'to_expire = []' in content and 'to_expire.append(' in content
if _uses_list and not _uses_set:
msg = ("Expiry-Loop nutzt Liste statt Set – doppelte Delete-Kandidaten bleiben moeglich")
file_errors.append(msg)
errors.append({"file": fname, "check": "Expiry-Deduplizierung fehlt", "detail": msg})
if fname == "feed_health_monitor.yml":
# Nur echten Code prüfen – Kommentare wie "# statt ast.literal_eval"
# dürfen keinen False-Positive auslösen.
_fhm_code = '\n'.join(
l for l in content.splitlines()
if l.strip() and not l.strip().startswith('#')
)
if 'ast.literal_eval(' in _fhm_code or re.search(r'^\s*import ast\b', _fhm_code, re.MULTILINE):
msg = ("Feed-Extraktion nutzt ast.literal_eval – bricht bei Kommentaren im SOURCES-Dict")
file_warnings.append(msg)
warnings.append({"file": fname, "check": "Fragile SOURCES-Extraktion", "detail": msg})
if fname == "geo_tagger.yml":
if 'MAX_ELAPSED_SECONDS' not in content and 'started_at = time.time()' not in content:
msg = ("Geo-Tagger hat keinen Elapsed-Time-Guard vor dem Laender-Loop")
file_warnings.append(msg)
warnings.append({"file": fname, "check": "Elapsed-Time-Guard fehlt", "detail": msg})
if fname == "netshield_report_generator.yml":
if 'cols[3]' in content:
msg = ("README-Fallback schreibt hart in cols[3] – Tabellenschema-Aenderungen koennen falsche Spalte aktualisieren")
file_warnings.append(msg)
warnings.append({"file": fname, "check": "Harte README-Spalte", "detail": msg})
if fname == "update_combined_blacklist.yml":
sort_pos = content.find('active_ips.sort(key=lambda e:')
sort_window = content[sort_pos:sort_pos + 220] if sort_pos != -1 else ''
if sort_pos != -1 and 'except Exception:' not in sort_window:
msg = ("Numerische Sortierung von active_ips ist nicht gegen kaputte Eintraege abgesichert")
file_warnings.append(msg)
warnings.append({"file": fname, "check": "Sort-Key ohne Fallback", "detail": msg})
# FIX CHK23-CB: Combined hat zwei ThreadPoolExecutor-Bloecke
# (Hauptfeed timeout=600, Auto-Feed timeout=120). Beide muessen
# except TimeoutError haben. Altes Literal-Match war zu eng.
_tpe_timeouts = re.findall(r'as_completed\(futures,\s*timeout=(\d+)\)', content)
_has_timeout_exc = 'except TimeoutError' in content
if _tpe_timeouts and not _has_timeout_exc:
msg = (
f"as_completed mit timeout= ({', '.join(_tpe_timeouts)}s) aber "
f"kein 'except TimeoutError' – Timeout wird nicht behandelt"
)
file_warnings.append(msg)
warnings.append({"file": fname, "check": "Timeout-Handling fehlt", "detail": msg})
# ── Check 24: Unpinned Actions (Sicherheitsrisiko) ──────────────
# uses: actions/xxx@v4 ohne SHA-Hash erlaubt Supply-Chain-Angriffe.
# Korrekt: uses: actions/checkout@<sha> # v4.2.2
_uses_re = re.compile(r'^\s*-?\s*uses:\s*(\S+)', re.MULTILINE)
for _um in _uses_re.finditer(content):
_action = _um.group(1).strip()
# FIX CHK24b: Kommentarzeilen überspringen.
_um_line = content.splitlines()[content[:_um.start()].count('\n')]
if _um_line.lstrip().startswith('#'):
continue
if '/' not in _action:
continue # Lokale Actions (./xxx)
if '@' not in _action:
msg = f"uses: {_action} – kein @version/@sha angegeben"
file_errors.append(msg)
errors.append({"file": fname, "check": "Unpinned Action", "detail": msg})
continue
_ref = _action.split('@', 1)[1]
# SHA-Hashes sind EXAKT 40 Hex-Zeichen. Nur auf Länge zu
# prüfen reicht nicht: ein Branch-Name mit >= 40 Zeichen
# (z.B. 'feature-very-long-malicious-branch-name-xx') würde
# als SHA durchrutschen und wäre damit ein weiterer
# Supply-Chain-Angriffsvektor.
if not re.fullmatch(r'[a-f0-9]{40}', _ref) and not _ref.startswith('$'):
msg = f"uses: {_action} – Tag statt SHA-Hash (Supply-Chain-Risiko)"
file_warnings.append(msg)
warnings.append({"file": fname, "check": "Action nicht SHA-pinned", "detail": msg})
# ── Check 25: timeout-minutes fehlt am Job ───────────────────
# Ohne timeout-minutes kann ein haengender Job bis zum GitHub-
# Standard (360min) laufen und Minuten verbrauchen.
_in_jobs_section = False
for _line in content.splitlines():
if _line.strip() == 'jobs:':
_in_jobs_section = True
continue
if _in_jobs_section and re.match(r'^ [a-z]', _line) and ':' in _line:
_job_name = _line.strip().rstrip(':')
_job_pos = content.find(_line)
_job_window = content[_job_pos:_job_pos+500]
_before_steps = _job_window.split('steps:')[0] if 'steps:' in _job_window else _job_window
if 'timeout-minutes:' not in _before_steps:
msg = f"Job '{_job_name}' hat kein timeout-minutes – haengende Runs verbrauchen bis zu 360min"
file_warnings.append(msg)
warnings.append({"file": fname, "check": "timeout-minutes fehlt", "detail": msg})
break
# ── Check 26: persist-credentials bei git push ───────────────
# Wenn ein Workflow git push ausfuehrt, braucht checkout
# persist-credentials: true. Fehlt es, schlaegt push fehl.
# FIX: content_shell_no_comments verwenden, damit Kommentare
# wie "# nicht per git push" nicht faelschlich triggern.
_has_git_push = 'git push' in content_shell_no_comments
_has_persist = 'persist-credentials: true' in content
if _has_git_push and not _has_persist:
msg = "git push verwendet aber checkout ohne persist-credentials: true – Push wird fehlschlagen"
file_errors.append(msg)
errors.append({"file": fname, "check": "persist-credentials fehlt", "detail": msg})
# ── Check 27: Permissions-Audit ──────────────────────────────
# contents: write nur wenn der Workflow tatsaechlich committet.
_has_contents_write = 'contents: write' in content
_has_commit = 'git commit' in content_shell_no_comments or 'git push' in content_shell_no_comments
if _has_contents_write and not _has_commit:
msg = "permissions: contents: write gesetzt aber kein git commit/push erkennbar – Least-Privilege-Verletzung"
file_warnings.append(msg)
warnings.append({"file": fname, "check": "Permissions zu breit", "detail": msg})
# ── Check 28: Doppelter Import im selben Block ───────────────
# Doppeltes "import re" oder "import sys" in einem Block deutet auf
# Copy-Paste-Fehler hin (os und sys sind ausgenommen da haeufig
# absichtlich re-importiert wegen Namespace-Isolation).
for _bi, _blk in enumerate(blocks):
_import_counts = {}
for _il in _blk.splitlines():
_il_s = _il.strip()
if _il_s.startswith('#'):
continue
_im = re.match(r'(?:from\s+(\S+)\s+)?import\s+(.+)', _il_s)
if _im:
for _mod in _im.group(2).split(','):
_mod_name = _mod.strip().split()[0].strip()
if _mod_name:
_import_counts[_mod_name] = _import_counts.get(_mod_name, 0) + 1
_dups = {m: c for m, c in _import_counts.items() if c > 1 and m not in {'os', 'sys'}}
if _dups:
msg = (f"Block {_bi}: Doppelte Imports: {', '.join(f'{m}({c}x)' for m, c in _dups.items())} – "
f"moeglicherweise Copy-Paste-Artefakt")
file_warnings.append(msg)
warnings.append({"file": fname, "check": "Doppelter Import", "detail": msg})
break
# ── Check 29: Fehlende FORCE_JAVASCRIPT_ACTIONS_TO_NODE24 ────
# GitHub empfiehlt diese env-Variable fuer alle Workflows mit
# actions/checkout >= v4 um Node.js 24 Kompatibilitaet zu erzwingen.
if 'actions/checkout@' in content and 'FORCE_JAVASCRIPT_ACTIONS_TO_NODE24' not in content:
msg = "FORCE_JAVASCRIPT_ACTIONS_TO_NODE24 env-Variable fehlt – Node.js Kompatibilitaetsproblem moeglich"
file_warnings.append(msg)
warnings.append({"file": fname, "check": "Node24 env fehlt", "detail": msg})
# ── Check 30: Leerungsschutz combined_threat_blacklist_ipv4.txt ──
# FIX-CB1: Regression-Check – MIN_COMBINED-Guard muss vorhanden sein
# bevor combined_threat_blacklist_ipv4.txt geschrieben wird.
if fname == "update_combined_blacklist.yml":
_code_lines = [l for l in content.splitlines() if l.strip() and not l.strip().startswith('#')]
_cb_code = "\n".join(_code_lines)
_has_min_combined = 'MIN_COMBINED' in _cb_code
_has_combined_write = 'combined_threat_blacklist_ipv4.txt' in _cb_code
if _has_combined_write and not _has_min_combined:
msg = ("combined_threat_blacklist_ipv4.txt wird geschrieben ohne MIN_COMBINED-Leerungsschutz "
"– leere Blacklist kann an OPNsense ausgeliefert werden (Regression FIX-CB1)")
file_errors.append(msg)
errors.append({"file": fname, "check": "Leerungsschutz combined fehlt", "detail": msg})
# ── Check 31: Leerungsschutz active_blacklist_ipv4.txt ───────────
# FIX-CB2: Regression-Check – MIN_ACTIVE-Guard muss vorhanden sein.
if fname == "update_combined_blacklist.yml":
_has_min_active = 'MIN_ACTIVE' in content
_has_active_write = 'active_blacklist_ipv4.txt' in content
if _has_active_write and not _has_min_active:
msg = ("active_blacklist_ipv4.txt wird geschrieben ohne MIN_ACTIVE-Leerungsschutz "
"(Regression FIX-CB2)")
file_warnings.append(msg)
warnings.append({"file": fname, "check": "Leerungsschutz active fehlt", "detail": msg})
# ── Check 32: Watchlist-Sentinel in active_blacklist-Loop ────────
# FIX-CB3: Regression-Check – Sentinel-Wert 2000-01-01 muss in der
# active_blacklist-Loop explizit übersprungen werden.
if fname == "update_combined_blacklist.yml":
_active_loop_start = content.find('active_ips = []')
_active_loop_end = content.find('active_ips.sort(', _active_loop_start) if _active_loop_start != -1 else -1
_active_loop_block = content[_active_loop_start:_active_loop_end] if _active_loop_start != -1 and _active_loop_end != -1 else ""
if _active_loop_block and '2000-01-01' not in _active_loop_block:
msg = ("active_blacklist-Loop überspringt Watchlist-Sentinel (last='2000-01-01') nicht explizit "
"– implizite Filterung über cutoff_30 kann bei Refactoring brechen (Regression FIX-CB3)")
file_warnings.append(msg)
warnings.append({"file": fname, "check": "Watchlist-Sentinel fehlt in active-Loop", "detail": msg})
# ── Check 33: Score Decay Cache-Alters-Schwelle ──────────────────
# FIX-SD1: Regression-Check – Schwelle darf nicht unter 12h sinken
# (6h erzeugte falsch-positive Warnungen nach einem verpassten Run).
if fname == "score_decay_monitor.yml":
_threshold_m = re.search(r'db_age_hours\s*>\s*(\d+)', content)
if _threshold_m:
_threshold_val = int(_threshold_m.group(1))
if _threshold_val < 12:
msg = (f"Cache-Alters-Schwelle ist {_threshold_val}h (< 12h) – "
f"erzeugt falsch-positive Warnungen nach einzelnem verpassten Combined-Run "
f"(Regression FIX-SD1)")
file_warnings.append(msg)
warnings.append({"file": fname, "check": "Cache-Schwelle zu streng", "detail": msg})
# ── Check 34: required_protection für false_positive_checker ─────
# FIX-WH9: Regression-Check – false_positive_checker.yml muss auf
# WHITELIST_CIDRS + is_whitelisted_entry geprüft werden, nicht is_protected_entry.
if fname == "workflow_health_checker.yml":
_rp_block_m = re.search(r'required_protection\s*=\s*\{(.+?)\}', content, re.DOTALL)
if _rp_block_m:
_rp_block = _rp_block_m.group(1)
_fp_line_m = re.search(r'"false_positive_checker\.yml"\s*:\s*\[([^\]]+)\]', _rp_block)
if _fp_line_m:
_fp_tokens = _fp_line_m.group(1)
if 'is_protected_entry' in _fp_tokens:
msg = ("required_protection für false_positive_checker.yml enthält 'is_protected_entry' "
"– diese Funktion existiert dort nicht, führt zu falsch-positiven Schutzfehlern "
"(Regression FIX-WH9)")
file_errors.append(msg)
errors.append({"file": fname, "check": "Falscher required_protection-Token", "detail": msg})
if 'is_whitelisted' not in _fp_tokens or 'whitelist.json' not in _fp_tokens:
msg = ("required_protection für false_positive_checker.yml fehlt 'whitelist.json' "
"oder 'is_whitelisted' (Regression FIX-WH9)")
file_warnings.append(msg)
warnings.append({"file": fname, "check": "required_protection unvollständig", "detail": msg})
# ── Check 35: sensor_total toter Code in honeydb_monitor ─────────
# FIX-HDB1: Regression-Check – sensor_total muss in stats{} einfließen
# oder anderweitig verwendet werden, nicht nur akkumuliert werden.
if fname == "honeydb_monitor.yml":
_has_sensor_total_acc = 'sensor_total +=' in content
# FIX CHK35: Suche ab dem LETZTEN sensor_total += (Paging-Schleife
# hat mehrere). find() liefert die ERSTE Stelle – bei mehreren +=
# war _post_acc zu kurz und übersah die stats[]-Zuweisung danach.
# Außerdem: explizite Klammern verhindern Python-Operatorprioritätsfalle
# ('print(... and ...' wurde als 'print(' or ('... and ...') ausgewertet).
_last_acc_pos = content.rfind('sensor_total +=')
_post_acc = content[_last_acc_pos:] if _last_acc_pos != -1 else ''
_in_stats = ('stats[' in _post_acc[:800]) and ('sensor_total' in _post_acc[:800])
_in_print = ('print(' in _post_acc[:800]) and ('sensor_total' in _post_acc[:800])
_sensor_used_after = _in_stats or _in_print
if _has_sensor_total_acc and not _sensor_used_after:
msg = ("sensor_total wird akkumuliert aber nicht in stats{} oder print() genutzt "
"– toter Code (Regression FIX-HDB1)")
file_warnings.append(msg)
warnings.append({"file": fname, "check": "sensor_total toter Code", "detail": msg})
# ── Check 36: IP-Listen Duplikate ────────────────────────────
# Doppelte IPs in combined/active/confidence.txt verschaffen
# einzelnen IPs ein überproportionales Gewicht und blähen die
# Datei auf. Geprüft werden alle .txt-Ausgabedateien des Workflows.
_ip_list_targets = {
"update_combined_blacklist.yml": [
"combined_threat_blacklist_ipv4.txt",
"active_blacklist_ipv4.txt",
],
"update_confidence_blacklist.yml": [
"blacklist_confidence40_ipv4.txt",
],
}
if fname in _ip_list_targets:
for _ip_file in _ip_list_targets[fname]:
if not os.path.exists(_ip_file):
continue
try:
with open(_ip_file, encoding="utf-8") as _ipf:
_ip_lines = [l.strip() for l in _ipf if l.strip() and not l.startswith('#')]
_ip_total = len(_ip_lines)
_ip_unique = len(set(_ip_lines))
_ip_dups = _ip_total - _ip_unique
if _ip_dups > 0:
_dup_pct = (_ip_dups / _ip_total * 100) if _ip_total else 0
_level = "errors" if _dup_pct > 1 else "warnings"
msg = (f"{_ip_file}: {_ip_dups:,} doppelte IPs "
f"({_dup_pct:.1f}% von {_ip_total:,}) – "
f"Scoring-Verzerrung und aufgeblähte Datei")
if _level == "errors":
file_errors.append(msg)
errors.append({"file": fname, "check": "IP-Listen Duplikate", "detail": msg})
else:
file_warnings.append(msg)
warnings.append({"file": fname, "check": "IP-Listen Duplikate", "detail": msg})
print(f" {'❌' if _level == 'errors' else '🟡'} {_ip_file}: {_ip_dups:,} Duplikate ({_dup_pct:.1f}%)")
else:
print(f" ✅ {_ip_file}: keine Duplikate ({_ip_total:,} IPs)")
except Exception as _e:
print(f" ⏭ {_ip_file}: nicht lesbar ({_e})")
# ── Check 37: Private IPs in Blacklists ──────────────────────
# RFC-1918-Adressen (10.x, 172.16-31.x, 192.168.x) in
# Ausgabe-Listen blockieren legale Infrastruktur und deuten auf
# fehlende PROTECTED_CIDRS-Filterung hin.
_PRIVATE_PATTERNS = re.compile(
r'^(?:10\.\d+\.\d+\.\d+'
r'|172\.(?:1[6-9]|2\d|3[01])\.\d+\.\d+'
r'|192\.168\.\d+\.\d+)$'
)
if fname in _ip_list_targets:
for _ip_file in _ip_list_targets[fname]:
if not os.path.exists(_ip_file):
continue
try:
with open(_ip_file, encoding="utf-8") as _ipf:
_priv_hits = [
l.strip().split()[0]
for l in _ipf
if l.strip() and not l.startswith('#')
and _PRIVATE_PATTERNS.match(l.strip().split()[0])
]
if _priv_hits:
msg = (f"{_ip_file}: {len(_priv_hits)} private RFC-1918-IP(s) "
f"– erste: {', '.join(_priv_hits[:5])} "
f"– PROTECTED_CIDRS-Filter prüfen")
file_errors.append(msg)
errors.append({"file": fname, "check": "Private IPs in Blacklist", "detail": msg})
print(f" ❌ {_ip_file}: {len(_priv_hits)} private IPs gefunden")
else:
print(f" ✅ {_ip_file}: keine privaten IPs")
except Exception as _e:
print(f" ⏭ {_ip_file}: nicht lesbar ({_e})")
# ── Check 38: Secret-Leak in echo ────────────────────────────
# echo $DBLOPEN secrets.* DBLCLOSE gibt Token ins Actions-Log.
# GitHub maskiert Secrets in run:-Bloecken nicht wenn sie via
# echo explizit ausgegeben werden.
# WICHTIG: {{ }} via chr() gebaut - GH Actions wuerde Literal-
# Sequenz als Expression expandieren und die YAML-Datei brechen.
_dbl_open = chr(123) + chr(123) # ergibt {{ ohne GH-Trigger
_dbl_close = chr(125) + chr(125) # ergibt }}
_secret_echo_re = re.compile(
r'echo\s+["' + "'" + r'"]?\$'
+ re.escape(_dbl_open)
+ r'\s*secrets\.[A-Za-z0-9_]+\s*'
+ re.escape(_dbl_close)
)
for _si, _shell in enumerate(shell_blocks):
for _sl_m in _secret_echo_re.finditer(_shell):
_snip = _shell[max(0, _sl_m.start()-10):_sl_m.end()+40].strip()
_leak_label = 'echo $' + _dbl_open + ' secrets.* ' + _dbl_close
msg = (f"shell-Block {_si}: '{_leak_label}' – "
f"Secret-Wert landet im Actions-Log (Token-Leak): {_snip[:80]}")
file_errors.append(msg)
errors.append({"file": fname, "check": "Secret-Leak in echo", "detail": msg})
break
# ── Check 39: YAML Tab-Zeichen ────────────────────────────────
# YAML-Parser lehnen Tab-Indentation ab (YAML 1.2 §6.1).
# GitHub Actions-YAML wirft einen unklaren Parse-Fehler statt
# eines hilfreichen Hinweises auf den Tab.
_tab_lines = []
for _tl_no, _tl in enumerate(content.splitlines(), 1):
# Nur führende Tabs zählen; Tabs im Python-Code (EOF-Block)
# sind kein YAML-Problem.
if _tl.startswith('\t'):
_tab_lines.append(_tl_no)
if _tab_lines:
_tab_sample = _tab_lines[:5]
msg = (f"YAML-Tab-Zeichen in Zeile(n) {_tab_sample} "
f"({'...' if len(_tab_lines) > 5 else ''}{len(_tab_lines)} gesamt) "
f"– GitHub YAML-Parser bricht mit irreführendem Fehler")
file_errors.append(msg)
errors.append({"file": fname, "check": "YAML Tab-Zeichen", "detail": msg})
results[fname] = {
"blocks": len(blocks),
"errors": file_errors,
"warnings": file_warnings,
"cron": cron_re.findall(content),
}
icon = "❌" if file_errors else ("⚠️" if file_warnings else "✅")
print(f"{icon} {fname}: {len(blocks)} Block(s) | "
f"{len(file_errors)} Fehler | {len(file_warnings)} Warnungen")
# ── Whitelist-Sync-Check ──────────────────────────────────────────
# Beide Workflows laden die Whitelist aus .github/workflows/whitelist.json.
# Prüfe: Datei existiert, hat genug Einträge, wird von beiden referenziert.
_wl_json_path = f"{WORKFLOWS_DIR}/whitelist.json"
_combined_path = f"{WORKFLOWS_DIR}/update_combined_blacklist.yml"
_fp_path = f"{WORKFLOWS_DIR}/false_positive_checker.yml"
if os.path.exists(_wl_json_path):
try:
with open(_wl_json_path, encoding="utf-8") as _wlf:
_wl_data = json.load(_wlf)
_wl_count = len(_wl_data.get("entries", []))
if _wl_count < 100:
warnings.append({
"file": "whitelist.json",
"check": "Whitelist zu klein",
"detail": f"Nur {_wl_count} Einträge (erwartet >100) – möglicherweise korrupt",
})
print(f"⚠️ whitelist.json: nur {_wl_count} Einträge")
else:
# FIX: CIDR-Format-Validierung – Tippfehler (z.B. /33) erkennen
import ipaddress as _ipaddr_check
_wl_invalid = []
for _wl_entry in _wl_data.get("entries", []):
try:
_ipaddr_check.ip_network(_wl_entry, strict=False)
except Exception:
_wl_invalid.append(_wl_entry)
if _wl_invalid:
errors.append({
"file": "whitelist.json",
"check": "Ungültige CIDR-Einträge",
"detail": f"{len(_wl_invalid)} ungültige Einträge: {', '.join(_wl_invalid[:5])}{'…' if len(_wl_invalid) > 5 else ''} – werden still ignoriert",
})
print(f"❌ whitelist.json: {len(_wl_invalid)} ungültige CIDR-Einträge")
else:
print(f"✅ whitelist.json: alle {_wl_count} Einträge sind gültige CIDRs")
# Prüfe ob beide Workflows die Datei referenzieren
_refs_ok = True
for _wf_path, _wf_name in [(_combined_path, "update_combined_blacklist.yml"), (_fp_path, "false_positive_checker.yml")]:
if os.path.exists(_wf_path):
_wf_text = open(_wf_path).read()
if "whitelist.json" not in _wf_text:
errors.append({
"file": _wf_name,
"check": "Whitelist-Referenz fehlt",
"detail": "whitelist.json wird nicht referenziert – Whitelist-Filterung inaktiv",
})
_refs_ok = False
print(f"❌ {_wf_name}: referenziert whitelist.json nicht")
if _refs_ok:
print(f"✅ Whitelist-Sync: whitelist.json mit {_wl_count} Einträgen, beide Workflows referenzieren sie")
except Exception as _ex:
errors.append({
"file": "whitelist.json",
"check": "Whitelist nicht lesbar",
"detail": f"whitelist.json konnte nicht geladen werden: {_ex}",
})
print(f"❌ whitelist.json nicht lesbar: {_ex}")
else:
errors.append({
"file": "whitelist.json",
"check": "Whitelist fehlt",
"detail": "whitelist.json nicht in .github/workflows/ – Whitelist-Filterung inaktiv",
})
print("❌ whitelist.json nicht vorhanden")
# ── HIGH_QUALITY ↔ SOURCES hq=True Konsistenz ─────────────────────
# Feeds mit hq=True in SOURCES muessen auch in HIGH_QUALITY stehen,
# sonst wird ip_in_hq nie befuellt → "last" wird nie gesetzt → IPs
# altern still aus (Bug-DP1 Regressionsschutz).
if os.path.exists(_combined_path):
_combined_text = open(_combined_path).read()
# HIGH_QUALITY Set extrahieren (Brace-Counting statt einfacher Regex)
_hq_m = re.search(r'HIGH_QUALITY\s*=\s*\{', _combined_text)
_hq_names = set()
if _hq_m:
_hq_start = _hq_m.end() - 1
_hq_depth = 0
_hq_end = _hq_start
for _i, _ch in enumerate(_combined_text[_hq_start:], _hq_start):
if _ch == '{': _hq_depth += 1
elif _ch == '}':
_hq_depth -= 1
if _hq_depth == 0: _hq_end = _i + 1; break
_hq_block = _combined_text[_hq_start:_hq_end]
# Nur String-Literale auf Code-Zeilen (keine Kommentare)
for _hq_line in _hq_block.splitlines():
_hq_line_code = _hq_line.split('#')[0]
for _m in re.finditer(r'"([a-z0-9_]+)"', _hq_line_code):
_hq_names.add(_m.group(1))
# SOURCES mit hq=True
_src_hq = set()
_src_re = re.compile(
r'^\s*"([^"\n]+)"\s*:\s*\(\s*"[^"\n]+"\s*,\s*True\s*\)',
re.MULTILINE,
)
for _m in _src_re.finditer(_combined_text):
_src_hq.add(_m.group(1))
_in_hq_not_src = _hq_names - _src_hq
_in_src_not_hq = _src_hq - _hq_names
if _in_src_not_hq:
errors.append({
"file": "update_combined_blacklist.yml",
"check": "HIGH_QUALITY ↔ SOURCES Drift",
"detail": f"hq=True in SOURCES aber nicht in HIGH_QUALITY: {', '.join(sorted(_in_src_not_hq)[:8])} – IPs altern still aus (Bug-DP1)",
})
print(f"❌ HIGH_QUALITY Drift: {len(_in_src_not_hq)} Feeds hq=True aber nicht in HIGH_QUALITY")
elif _in_hq_not_src:
warnings.append({
"file": "update_combined_blacklist.yml",
"check": "HIGH_QUALITY ↔ SOURCES Drift",
"detail": f"In HIGH_QUALITY aber nicht hq=True in SOURCES: {', '.join(sorted(_in_hq_not_src)[:8])} – toter Code",
})
print(f"⚠️ HIGH_QUALITY Drift: {len(_in_hq_not_src)} in HIGH_QUALITY ohne SOURCES-Eintrag")
else:
print(f"✅ HIGH_QUALITY ↔ SOURCES: konsistent ({len(_hq_names)} HQ-Feeds)")
# ── Check 40: Untrusted Feed hq-Guard ────────────────────────────
# Feeds mit hq=True sollten nur von bekannten, verifizierten Betreibern
# stammen. Ein unbekannter Betreiber mit hq=True kann die Blockliste
# mit falschen Positiven überfluten (kein Score-Altern, keine Prüfung).
# Bekannte, vertrauenswürdige Quellen-Schlüsselwörter (Betreiber-Whitelist):
_TRUSTED_HQ_KEYWORDS = {
"abuseipdb", "emerging", "feodo", "blocklist.de", "spamhaus",
"cinsscore", "cinsarmy", "dshield", "abuse.ch", "urlhaus", "sslbl",
"bruteforce", "eicar", "mirai", "honeypot", "honeydb",
"cve", "ransomware", "botnet", "threatfox", "openphish",
"myip.ms", "stopforumspam", "greensnow", "binarydefense",
"torproject", "dan.me", "ipsum", "stamparm", "firehol",
"talos", "microsoft", "et_pro", "proofpoint",
"crowdsec", "dataplane", "threatview", "turris",
"criticalpath", "c2_iplist", "c2intelfeeds",
# The Honeynet Project (501(c)(3), seit 1999) – betreibt u.a.
# GreedyBear (T-POT-Cluster Aggregator auf greedybear.honeynet.org).
# Deckt greedybear_recent und zukünftige Honeynet-Feeds ab.
"honeynet",
}
if os.path.exists(_combined_path):
_cb_text = open(_combined_path).read()
# Alle SOURCES-Einträge extrahieren: "name": ("url", hq_bool)
_sources_re = re.compile(
r'^\s*"([^"\n]+)"\s*:\s*\(\s*"([^"\n]+)"\s*,\s*(True|False)\s*\)',
re.MULTILINE,
)
_untrusted_hq = []
for _sm in _sources_re.finditer(_cb_text):
_feed_name = _sm.group(1).lower()
_feed_url = _sm.group(2).lower()
_feed_hq = _sm.group(3) == "True"
if not _feed_hq:
continue
# Vertrauenswürdig wenn Name ODER URL ein bekanntes Keyword enthält
_trusted = any(
kw in _feed_name or kw in _feed_url
for kw in _TRUSTED_HQ_KEYWORDS
)
if not _trusted:
_untrusted_hq.append((_sm.group(1), _sm.group(2)))
if _untrusted_hq:
_detail = "; ".join(
f'"{n}" ({u[:60]})' for n, u in _untrusted_hq[:6]
)
warnings.append({
"file": "update_combined_blacklist.yml",
"check": "Untrusted Feed hq=True",
"detail": (
f"{len(_untrusted_hq)} Feed(s) mit hq=True ohne bekannten Betreiber "
f"– IPs bleiben dauerhaft in active_blacklist ohne Score-Altern: {_detail}"
),
})
print(f"⚠️ Untrusted hq=True: {len(_untrusted_hq)} Feed(s) ohne bekannten Betreiber")
else:
# SOURCES nicht vorhanden → nur wenn Datei existiert aber leer
if _sources_re.search(_cb_text):
print(f"✅ Untrusted Feed hq-Guard: alle hq=True Feeds haben bekannten Betreiber")
else:
print(f"⏭ Untrusted Feed hq-Guard: keine SOURCES gefunden")
# ── DNS_WHITELIST Konsistenz ───────────────────────────────────────
# Mehrere Workflows definieren DNS_WHITELIST eigenstaendig.
# Alle muessen identisch sein.
_dns_wl_per_file = {}
for _fname in files:
_fpath = f"{WORKFLOWS_DIR}/{_fname}"
_ftext = open(_fpath).read()
_dns_m = re.search(r'DNS_WHITELIST\s*=\s*\{', _ftext)
if not _dns_m:
continue
_dns_start = _dns_m.end() - 1
_dns_depth = 0
_dns_end = _dns_start
for _i, _ch in enumerate(_ftext[_dns_start:], _dns_start):
if _ch == '{': _dns_depth += 1
elif _ch == '}':
_dns_depth -= 1
if _dns_depth == 0: _dns_end = _i + 1; break
_dns_block = _ftext[_dns_start:_dns_end]
_dns_ips = {m.strip() for m in re.findall(r'"([^"]+)"', _dns_block)}
_dns_wl_per_file[_fname] = _dns_ips
if len(_dns_wl_per_file) >= 2:
_dns_reference = None
_dns_ref_file = None
_dns_drifts = []
for _fname, _ips in sorted(_dns_wl_per_file.items()):
if _dns_reference is None:
_dns_reference = _ips
_dns_ref_file = _fname
continue
if _ips != _dns_reference:
_only_ref = _dns_reference - _ips
_only_this = _ips - _dns_reference
_dns_drifts.append((_fname, len(_only_ref), len(_only_this)))
if _dns_drifts:
_drift_detail = "; ".join(
f"{f} (+{a}/-{r} vs {_dns_ref_file})" for f, r, a in _dns_drifts
)
warnings.append({
"file": " ↔ ".join(sorted(_dns_wl_per_file.keys())),
"check": "DNS_WHITELIST Drift",
"detail": f"DNS_WHITELIST nicht identisch: {_drift_detail}",
})
print(f"⚠️ DNS_WHITELIST Drift: {len(_dns_drifts)} Datei(en) abweichend")
else:
print(f"✅ DNS_WHITELIST: konsistent über {len(_dns_wl_per_file)} Dateien")
# ── LOCAL_FEEDS ↔ Sub-Workflow Ausgabedateien ──────────────────────
# LOCAL_FEEDS in update_combined_blacklist.yml referenziert Dateien die
# von Sub-Workflows erzeugt werden. Prüfe ob diese Dateien auch
# tatsaechlich von einem Workflow geschrieben werden.
if os.path.exists(_combined_path):
_combined_text_lf = open(_combined_path).read()
_lf_m = re.search(r'LOCAL_FEEDS\s*=\s*\{', _combined_text_lf)
_local_feed_files = set()
if _lf_m:
_lf_start = _lf_m.end() - 1
_lf_depth = 0
_lf_end = _lf_start
for _i, _ch in enumerate(_combined_text_lf[_lf_start:], _lf_start):
if _ch == '{': _lf_depth += 1
elif _ch == '}':
_lf_depth -= 1
if _lf_depth == 0: _lf_end = _i + 1; break
_lf_block = _combined_text_lf[_lf_start:_lf_end]
for _m in re.finditer(r'"([^"]+\.txt)"', _lf_block):
_local_feed_files.add(_m.group(1))
# Prüfe welche LOCAL_FEED-Dateien von keinem Workflow geschrieben werden
_all_written = set()
for _fname in files:
_fpath = f"{WORKFLOWS_DIR}/{_fname}"
_ftext = open(_fpath).read()
_fblocks = re.findall(r"python3 << 'EOF'(.*?)\n\s*EOF", _ftext, re.DOTALL)
for _blk in _fblocks:
for _m in re.finditer(r'open\(\s*["\']([a-z0-9_.\-]+\.txt)["\']', _blk):
_all_written.add(_m.group(1))
for _m in re.finditer(r'\b([A-Z_]+)\s*=\s*["\']([a-z0-9_.\-]+\.txt)["\']', _blk):
_all_written.add(_m.group(2))
_orphan_feeds = _local_feed_files - _all_written
if _orphan_feeds:
warnings.append({
"file": "update_combined_blacklist.yml",
"check": "LOCAL_FEEDS verwaist",
"detail": f"LOCAL_FEEDS referenziert Dateien ohne erkennbaren Erzeuger-Workflow: {', '.join(sorted(_orphan_feeds))}",
})
print(f"⚠️ LOCAL_FEEDS: {len(_orphan_feeds)} verwaiste Dateien")
else:
print(f"✅ LOCAL_FEEDS: alle {len(_local_feed_files)} Dateien haben Erzeuger-Workflow")
# ── Doppelte Feed-URLs ─────────────────────────────────────────────
# Gleiche URL in mehreren Workflows/Dicts fuehrt zu doppeltem today_count
# und aufgeblaesenem Confidence-Score (dokumentierter Bug-Typ).
_url_to_workflows = {}
_url_re = re.compile(r'"(https?://[^"\s]+)"')
for _fname in files:
_fpath = f"{WORKFLOWS_DIR}/{_fname}"
_ftext = open(_fpath).read()
_fblocks = re.findall(r"python3 << 'EOF'(.*?)\n\s*EOF", _ftext, re.DOTALL)
for _blk in _fblocks:
# Nur URLs in SOURCES/CVE_SOURCES/HONEYPOT_SOURCES-artigen Dicts
for _line in _blk.splitlines():
_line_code = _line.split('#')[0]
for _m in _url_re.finditer(_line_code):
_url = _m.group(1)
# Nur Feed-URLs (nicht GitHub API, nicht ScaniteX, nicht hackinggate)
if any(skip in _url for skip in [
'api.github.com', 'scanitex.com', 'hackinggate.com',
'api.abuseipdb.com', 'honeydb.io/api',
]):
continue
_url_to_workflows.setdefault(_url, set()).add(_fname)
_dup_urls = {u: wfs for u, wfs in _url_to_workflows.items() if len(wfs) > 1}
if _dup_urls:
# Nur melden wenn die URL nicht bereits als Duplikat-Kommentar dokumentiert ist
# Zwei Erkennungsmethoden:
# (a) Spezifisch: URL-Kurzname + Schlüsselwort in derselben Zeile
# (b) Generell: HINWEIS-Block mit Schlüsselwort "BEWUSST/überlappen/Overlap"
# → dokumentiert die gesamte Feed-Liste als absichtlich doppelt
_doc_keywords_specific = r'(?:entfernt|identisch|doppelt)'
_doc_keywords_general = r'(?:BEWUSST|ueberlappen|überlappen|[Oo]verlap|intentional)'
_real_dups = []
for _url, _wfs in sorted(_dup_urls.items()):
_documented = False
_url_short = _url.split('/')[-1][:30]
for _wf in _wfs:
_wf_text = open(f"{WORKFLOWS_DIR}/{_wf}").read()
# (a) Spezifisch: "# ... entfernt/identisch/doppelt ... <url_short>"
if re.search(rf'#.*{_doc_keywords_specific}.*{re.escape(_url_short[:15])}', _wf_text, re.I):
_documented = True
break
# (b) Generell: HINWEIS-Block der die Feed-Liste als bewusst doppelt dokumentiert
if re.search(rf'#\s*HINWEIS:.*{_doc_keywords_general}', _wf_text):
_documented = True
break
if not _documented:
_real_dups.append((_url, _wfs))
if _real_dups:
_dup_detail = "; ".join(
f"{u.split('/')[-1][:40]} in {'+'.join(sorted(wfs))}"
for u, wfs in _real_dups[:5]
)
warnings.append({
"file": "Cross-Workflow",
"check": "Doppelte Feed-URLs",
"detail": f"{len(_real_dups)} URL(s) in mehreren Workflows – today_count Aufblaehung moeglich: {_dup_detail}",
})
print(f"⚠️ Doppelte Feed-URLs: {len(_real_dups)} undokumentierte Duplikate")
else:
print(f"✅ Feed-URLs: {len(_dup_urls)} Duplikate gefunden, alle dokumentiert")
else:
print(f"✅ Feed-URLs: keine Duplikate ({len(_url_to_workflows)} URLs)")
# ── EXPIRY_DAYS Konsistenz ────────────────────────────────────────
# update_combined_blacklist und auto_feed_discovery muessen gleiche
# Werte fuer EXPIRY_DAYS und WATCHLIST_EXPIRY_DAYS verwenden.
_expiry_map = {}
_wl_expiry_map = {}
for _fname in files:
_ftext = open(f"{WORKFLOWS_DIR}/{_fname}").read()
for _m in re.finditer(r'\bEXPIRY_DAYS\s*=\s*(\d+)', _ftext):
_expiry_map.setdefault(int(_m.group(1)), []).append(_fname)
for _m in re.finditer(r'\bWATCHLIST_EXPIRY_DAYS\s*=\s*(\d+)', _ftext):
_wl_expiry_map.setdefault(int(_m.group(1)), []).append(_fname)
if len(_expiry_map) > 1:
_detail = "; ".join(f"{v}d in {'+'.join(wfs)}" for v, wfs in sorted(_expiry_map.items()))
errors.append({
"file": "Cross-Workflow",
"check": "EXPIRY_DAYS Inkonsistenz",
"detail": f"Verschiedene EXPIRY_DAYS-Werte: {_detail} – IPs werden unterschiedlich lang gehalten",
})
print(f"❌ EXPIRY_DAYS Inkonsistenz: {_detail}")
elif _expiry_map:
print(f"✅ EXPIRY_DAYS: konsistent ({list(_expiry_map.keys())[0]}d)")
if len(_wl_expiry_map) > 1:
_detail = "; ".join(f"{v}d in {'+'.join(wfs)}" for v, wfs in sorted(_wl_expiry_map.items()))
warnings.append({
"file": "Cross-Workflow",
"check": "WATCHLIST_EXPIRY_DAYS Inkonsistenz",
"detail": f"Verschiedene WATCHLIST_EXPIRY_DAYS: {_detail}",
})
print(f"⚠️ WATCHLIST_EXPIRY_DAYS Inkonsistenz: {_detail}")
# ── Cache-Key Prefix Validierung ──────────────────────────────────
# Jeder Workflow darf nur in seinen eigenen Cache-Prefix schreiben.
# Combined → v2, FP-Checker → fp, Auto-Discovery → afd, Community → community
_cache_write_rules = {
"update_combined_blacklist.yml": "v2",
"false_positive_checker.yml": "fp",
"auto_feed_discovery.yml": "afd",
"community_ip_report.yml": "community",
}
for _fname, _expected_prefix in _cache_write_rules.items():
_fpath = f"{WORKFLOWS_DIR}/{_fname}"
if not os.path.exists(_fpath):
continue
_ftext = open(_fpath).read()
# Cache Save-Keys extrahieren (key: netshield-seen-db-...)
# Beide Patterns: actions/cache (impliziter Save) und actions/cache/save
_save_keys = re.findall(
r'key:\s*(netshield-seen-db-[^\s\n]+)',
_ftext
)
for _sk in _save_keys:
# restore-keys Zeilen ignorieren (die dürfen andere Prefixe haben)
_sk_ctx_start = max(0, _ftext.find(_sk) - 200)
_sk_ctx = _ftext[_sk_ctx_start:_ftext.find(_sk) + len(_sk)]
if 'restore-keys' in _sk_ctx:
continue
if f"-{_expected_prefix}-" not in _sk and not _sk.startswith(f"netshield-seen-db-{_expected_prefix}"):
# Ausnahme: Score Decay nutzt cache/restore (read-only) – kein Write
if _fname == "score_decay_monitor.yml":
continue
warnings.append({
"file": _fname,
"check": "Cache-Key Prefix falsch",
"detail": f"Save-Key '{_sk}' hat nicht den erwarteten Prefix '{_expected_prefix}' – potenzieller Cache-Overwrite",
})
print(f"⚠️ Cache-Key: {_fname} schreibt in '{_sk}' statt '{_expected_prefix}'")
# ── Workflow-Abhaengigkeitskette ──────────────────────────────────
# Sub-Workflows muessen VOR update_combined_blacklist laufen.
# Prüfe ob die Cron-Zeiten das garantieren.
# Mindest-Puffer (Minuten) zwischen Sub-Workflow und Combined.
# Combined braucht ~20min – 60min Puffer ist konservativ sicher.
_DEP_MIN_GAP = 60
_dependency_chain = {
"honeydb_monitor.yml": "update_combined_blacklist.yml",
"honeypot_monitor.yml": "update_combined_blacklist.yml",
"update_bot_detector.yml": "update_combined_blacklist.yml",
"cve_to_ip_mapper.yml": "update_combined_blacklist.yml",
}
for _dep_src, _dep_dst in _dependency_chain.items():
if _dep_src not in file_crons or _dep_dst not in file_crons:
continue
_src_mins = []
_dst_mins = []
for _expr in file_crons.get(_dep_src, []):
_src_mins.extend(_cron_minutes_of_day(_expr.strip()))
for _expr in file_crons.get(_dep_dst, []):
_dst_mins.extend(_cron_minutes_of_day(_expr.strip()))
_src_mins = sorted(set(_src_mins))
_dst_mins = sorted(set(_dst_mins))
if not _src_mins or not _dst_mins:
continue
# Fuer jeden Sub-Workflow-Zeitpunkt: gibt es einen Combined-Lauf
# mindestens _DEP_MIN_GAP Minuten spaeter (am selben Tag)?
_dep_bad = []
for _s in _src_mins:
# FIX BUG-5: >= statt >, sonst werden gleichzeitige Combined-Läufe
# übersprungen und der Checker meldet faelschlich OK (sieht den
# naechsten Combined 3h spaeter als "valid 180min Puffer"). Δ=0
# ist eine echte Kollision: Combined liest die Sub-Workflow-Files
# aus dem Checkout, also den Stand vom vorherigen Lauf (bis zu
# 6h alt bei honeypot). Mit >= wird gap=0 < _DEP_MIN_GAP korrekt
# als Warnung gemeldet.
_later = [_d for _d in _dst_mins if _d >= _s]
if not _later:
# Sub-Workflow laueft nach dem letzten Combined des Tages.
# Combined laueft aber auch frueher (*/3h), daher kein harter Fehler –
# die Daten werden beim naechsten frueheren Combined-Lauf verarbeitet.
pass
else:
_gap = _later[0] - _s
if _gap < _DEP_MIN_GAP:
_dep_bad.append(
f"{_fmt_minute_of_day(_s)} → {_fmt_minute_of_day(_later[0])} "
f"({_gap}min < {_DEP_MIN_GAP}min Mindestpuffer)"
)
if _dep_bad:
warnings.append({
"file": f"{_dep_src} → {_dep_dst}",
"check": "Sub-Workflow Puffer zu knapp",
"detail": (
f"Sub-Workflow laueft zu knapp vor Combined – "
f"Output moeglicherweise nicht rechtzeitig verfuegbar: "
+ "; ".join(_dep_bad[:5])
),
})
# ── Score-Modell Konsistenz (combined ↔ confidence) ──────────────
# Die Confidence-Score-Berechnung muss in update_combined_blacklist.yml
# und update_confidence_blacklist.yml identisch sein. Drift fuehrt zu
# inkonsistenten active_blacklist und confidence40-Listen.
#
# FIX CHK-SCORE-SSOT: Die Scoring-Logik wurde in
# netshield_common.calculate_confidence zentralisiert. Wenn beide
# Workflows diese Funktion importieren, ist Divergenz per Definition
# ausgeschlossen – kein Vergleich von Inline-Literalen noetig. Nur
# wenn einer der Workflows die Funktion NICHT importiert (also
# Inline-Scoring verwendet), wird die alte Literal-Drift-Pruefung
# aktiviert.
_score_files = {
"update_combined_blacklist.yml": f"{WORKFLOWS_DIR}/update_combined_blacklist.yml",
"update_confidence_blacklist.yml": f"{WORKFLOWS_DIR}/update_confidence_blacklist.yml",
}
_score_texts = {}
for _sf_name, _sf_path in _score_files.items():
if os.path.exists(_sf_path):
_score_texts[_sf_name] = open(_sf_path).read()
# Existiert die zentrale Helfer-Funktion ueberhaupt?
_common_path = "scripts/netshield_common.py"
_helper_exists = (
os.path.exists(_common_path)
and re.search(r'^\s*def\s+calculate_confidence\s*\(',
open(_common_path).read(), re.MULTILINE) is not None
)
def _imports_calc_conf(text):
# Akzeptiere Multi-Line-Import aus netshield_common, z.B.
# from netshield_common import (
# ... , calculate_confidence, ... )
_imp_re = re.compile(
r'from\s+netshield_common\s+import\s*\(([^)]*)\)', re.DOTALL
)
for _m in _imp_re.finditer(text):
if re.search(r'\bcalculate_confidence\b', _m.group(1)):
return True
# Single-line Import
return bool(re.search(
r'from\s+netshield_common\s+import\s+[^\n]*\bcalculate_confidence\b',
text,
))
if len(_score_texts) == 2 and _helper_exists and all(
_imports_calc_conf(_t) for _t in _score_texts.values()
):
# SSOT: beide Workflows nutzen calculate_confidence aus
# netshield_common. Divergenz strukturell ausgeschlossen.
print("✅ Score-Modell: zentrale calculate_confidence in netshield_common – SSOT")
elif len(_score_texts) == 2:
_score_models = {}
for _sf_name, _sf_text in _score_texts.items():
_sa = re.findall(r'score_a\s*=\s*(\d+)', _sf_text)
_sb = re.findall(r'score_b\s*=\s*(\d+)', _sf_text)
_sc = re.findall(r'score_c\s*=\s*(\d+)', _sf_text)
_sd = re.findall(r'score_d\s*=\s*(\d+)', _sf_text)
_score_models[_sf_name] = {
"a": tuple(_sa), "b": tuple(_sb),
"c": tuple(_sc), "d": tuple(_sd),
}
_sm_keys = list(_score_models.keys())
_sm_a, _sm_b = _score_models[_sm_keys[0]], _score_models[_sm_keys[1]]
# Wenn einer der Workflows gar keine Score-Literale enthaelt aber
# auch calculate_confidence nicht importiert, kann das auf einen
# unvollstaendigen Refactor hindeuten – als Warnung, nicht Fehler.
_empty = [k for k, v in _score_models.items()
if not any(v[d] for d in "abcd")]
if len(_empty) == 1:
_missing = _empty[0]
msg = (f"{_missing}: keine Inline-Score-Literale und kein "
f"Import von calculate_confidence – Scoring-Quelle unklar")
warnings.append({
"file": _missing,
"check": "Score-Modell Quelle unklar",
"detail": msg,
})
print(f"⚠️ {msg}")
else:
_score_diffs = []
for _dim in ("a", "b", "c", "d"):
if _sm_a[_dim] != _sm_b[_dim]:
_score_diffs.append(f"score_{_dim}")
if _score_diffs:
errors.append({
"file": " ↔ ".join(_sm_keys),
"check": "Score-Modell Divergenz",
"detail": f"Unterschiedliche Schwellen in: {', '.join(_score_diffs)} – active_blacklist und confidence40 bewerten IPs unterschiedlich",
})
print(f"❌ Score-Modell Divergenz: {', '.join(_score_diffs)}")
else:
print(f"✅ Score-Modell: identisch in combined und confidence")
# ── PROTECTED_CIDRS Sync ────────────────────────────────────────
# Mehrere Workflows definieren PROTECTED_CIDRS. Drift fuehrt dazu
# dass ein Workflow private IPs durchlaesst die ein anderer filtert.
_prot_per_file = {}
for _fname in files:
_ftext = open(f"{WORKFLOWS_DIR}/{_fname}").read()
_prot_m = re.search(r'PROTECTED_CIDRS\s*=\s*\[', _ftext)
if not _prot_m:
continue
_p_start = _prot_m.end()
_p_depth = 1
_p_i = _p_start
while _p_i < len(_ftext) and _p_depth > 0:
if _ftext[_p_i] == '[': _p_depth += 1
elif _ftext[_p_i] == ']': _p_depth -= 1
_p_i += 1
_p_block = _ftext[_p_start:_p_i-1]
_p_cidrs = {c.strip() for c in re.findall(r'"([^"]+)"', _p_block)}
_prot_per_file[_fname] = _p_cidrs
if len(_prot_per_file) >= 2:
_prot_ref = None
_prot_ref_file = None
_prot_drifts = []
for _fname, _cidrs in sorted(_prot_per_file.items()):
if _prot_ref is None:
_prot_ref = _cidrs
_prot_ref_file = _fname
continue
if _cidrs != _prot_ref:
_prot_drifts.append(_fname)
if _prot_drifts:
warnings.append({
"file": " ↔ ".join(sorted(_prot_per_file.keys())),
"check": "PROTECTED_CIDRS Drift",
"detail": f"PROTECTED_CIDRS nicht identisch: {', '.join(_prot_drifts)} weichen von {_prot_ref_file} ab",
})
print(f"⚠️ PROTECTED_CIDRS Drift: {len(_prot_drifts)} Datei(en) abweichend")
else:
print(f"✅ PROTECTED_CIDRS: konsistent über {len(_prot_per_file)} Dateien")
# ── Actions SHA Aktualitaet ─────────────────────────────────────
# Prüfe ob alle Workflows dieselbe SHA fuer actions/checkout und
# actions/cache verwenden. Gemischte Versionen deuten auf vergessene
# Updates hin.
_action_shas = {} # "actions/checkout" → {sha: [files]}
for _fname in files:
_ftext = open(f"{WORKFLOWS_DIR}/{_fname}").read()
for _am in re.finditer(r'uses:\s*(actions/\S+)@([a-f0-9]{40})', _ftext):
_a_name = _am.group(1)
_a_sha = _am.group(2)
_action_shas.setdefault(_a_name, {}).setdefault(_a_sha, []).append(_fname)
for _a_name, _sha_map in _action_shas.items():
if len(_sha_map) > 1:
# Heuristik: die SHA, die in den meisten Workflows steckt,
# gilt als Referenz. Ohne GitHub-API-Zugriff koennen wir
# die "neueste" SHA nicht verlaesslich bestimmen – die
# Mehrheits-SHA ist aber ein guter Proxy, weil vergessene
# Update-PRs meist nur einzelne Dateien betreffen.
_majority_sha = max(_sha_map.keys(), key=lambda s: len(_sha_map[s]))
_outdated = {sha: wfs for sha, wfs in _sha_map.items() if sha != _majority_sha}
_outdated_files = [f for wfs in _outdated.values() for f in wfs]
if _outdated_files:
warnings.append({
"file": ", ".join(sorted(set(_outdated_files))[:5]),
"check": f"{_a_name} Version-Drift",
"detail": f"{len(_outdated_files)} Datei(en) weichen von der Mehrheits-SHA ab – Update vergessen oder verfrueht?",
})
print(f"⚠️ {_a_name}: {len(_outdated_files)} Dateien mit abweichender SHA")
# ══════════════════════════════════════════════════════════════════
# PRODUCTION HEALTH CHECKS (Strategie-Dokument v3)
# Prüft Laufzeit-Zustand der Pipeline, nicht nur statische Code-Qualität.
# Alarm-Logik: WARN = beobachten, CRITICAL = sofort prüfen.
# ══════════════════════════════════════════════════════════════════
prod_health = [] # list of {"level": "WARN"|"CRITICAL", "check": ..., "detail": ...}
_TS_RE_HEALTH = re.compile(r'#\s*Aktualisiert:\s*(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2})\s*UTC')
def _file_age_hours(filepath):
"""Liest Aktualisiert-Timestamp aus Datei-Header, gibt Alter in Stunden zurück."""
if not os.path.exists(filepath):
return None
try:
with open(filepath, encoding="utf-8") as _fh:
for _ in range(10):
_line = _fh.readline()
if not _line:
break
_m = _TS_RE_HEALTH.search(_line)
if _m:
_dt = datetime.strptime(_m.group(1), "%Y-%m-%d %H:%M").replace(tzinfo=timezone.utc)
return (now - _dt).total_seconds() / 3600
except Exception as _suppressed:
print(f"WARN: suppressed Exception: {_suppressed}", file=sys.stderr)
return None
def _count_ips(filepath):
if not os.path.exists(filepath):
return None
count = 0
with open(filepath, encoding="utf-8") as _fh:
for _line in _fh:
if _line.strip() and not _line.startswith("#"):
count += 1
return count
print("\n" + "=" * 60)
print("PRODUCTION HEALTH CHECKS")
print("=" * 60)
# ── PH1: seen_db Integrität (via seen_db_meta.json) ───────────────
_meta_file = "seen_db_meta.json"
if os.path.exists(_meta_file):
try:
with open(_meta_file, encoding="utf-8") as _mf:
_meta = json.load(_mf)
_meta_ips = _meta.get("total_ips", 0)
_meta_mb = _meta.get("size_mb", 0)
_meta_err = _meta.get("error")
_meta_upd = _meta.get("updated", "")
if _meta_err:
prod_health.append({
"level": "CRITICAL", "check": "seen_db Integrität",
"detail": f"seen_db_meta.json meldet Fehler: {_meta_err}",
})
print(f" 🔴 seen_db: Fehler gemeldet – {_meta_err}")
elif _meta_ips == 0:
prod_health.append({
"level": "CRITICAL", "check": "seen_db Integrität",
"detail": "seen_db_meta.json: total_ips=0 – seen_db ist leer oder nicht vorhanden",
})
print(" 🔴 seen_db: 0 IPs (leer oder fehlend)")
elif _meta_ips < 10_000:
prod_health.append({
"level": "WARN", "check": "seen_db Integrität",
"detail": f"seen_db_meta.json: nur {_meta_ips:,} IPs (erwartet >10.000) – möglicherweise unvollständig",
})
print(f" 🟡 seen_db: {_meta_ips:,} IPs (niedrig)")
else:
print(f" ✅ seen_db: {_meta_ips:,} IPs, {_meta_mb} MB")
# Alter der seen_db_meta prüfen
if _meta_upd:
try:
_meta_dt = datetime.strptime(_meta_upd.replace(" UTC", ""), "%Y-%m-%d %H:%M").replace(tzinfo=timezone.utc)
_meta_age_h = (now - _meta_dt).total_seconds() / 3600
if _meta_age_h > 10:
prod_health.append({
"level": "CRITICAL", "check": "seen_db Aktualität",
"detail": f"seen_db_meta.json ist {_meta_age_h:.0f}h alt (letztes Update: {_meta_upd}) – Combined-Pipeline steht still",
})
print(f" 🔴 seen_db Alter: {_meta_age_h:.0f}h (CRITICAL >10h)")
elif _meta_age_h > 6:
prod_health.append({
"level": "WARN", "check": "seen_db Aktualität",
"detail": f"seen_db_meta.json ist {_meta_age_h:.0f}h alt (letztes Update: {_meta_upd}) – mindestens 1 Combined-Run verpasst",
})
print(f" 🟡 seen_db Alter: {_meta_age_h:.0f}h (WARN >6h)")
else:
print(f" ✅ seen_db Alter: {_meta_age_h:.1f}h")
except Exception as _suppressed:
print(f"WARN: suppressed Exception: {_suppressed}", file=sys.stderr)
except Exception as _ex:
prod_health.append({
"level": "CRITICAL", "check": "seen_db Integrität",
"detail": f"seen_db_meta.json nicht lesbar: {_ex}",
})
print(f" 🔴 seen_db_meta.json nicht lesbar: {_ex}")
else:
prod_health.append({
"level": "WARN", "check": "seen_db Integrität",
"detail": "seen_db_meta.json nicht vorhanden – Combined-Workflow noch nie gelaufen?",
})
print(" 🟡 seen_db_meta.json nicht vorhanden")
# ── PH2: Aktualität der Output-Listen ─────────────────────────────
_freshness_checks = [
# (datei, warn_h, crit_h, beschreibung)
("active_blacklist_ipv4.txt", 6, 10, "Active Blacklist (Stufe 2)"),
("combined_threat_blacklist_ipv4.txt", 6, 10, "Combined Blacklist (Stufe 1)"),
("blacklist_confidence40_ipv4.txt", 6, 10, "Confidence-40 Blacklist"),
("feed_health_report.md", 30, 50, "Feed Health Report"),
]
for _fpath, _warn_h, _crit_h, _desc in _freshness_checks:
_age = _file_age_hours(_fpath)
if _age is None:
if _fpath.endswith("_report.md"):
# Reports sind optional – kein Alarm
print(f" ⏭ {_desc}: Datei nicht vorhanden (optional)")
else:
prod_health.append({
"level": "WARN", "check": f"Aktualität: {_desc}",
"detail": f"{_fpath} nicht vorhanden",
})
print(f" 🟡 {_desc}: nicht vorhanden")
elif _age > _crit_h:
prod_health.append({
"level": "CRITICAL", "check": f"Aktualität: {_desc}",
"detail": f"{_fpath} ist {_age:.0f}h alt (CRITICAL-Schwelle: {_crit_h}h) – Pipeline-Stillstand",
})
print(f" 🔴 {_desc}: {_age:.0f}h alt (>{_crit_h}h)")
elif _age > _warn_h:
prod_health.append({
"level": "WARN", "check": f"Aktualität: {_desc}",
"detail": f"{_fpath} ist {_age:.0f}h alt (WARN-Schwelle: {_warn_h}h)",
})
print(f" 🟡 {_desc}: {_age:.0f}h alt (>{_warn_h}h)")
else:
print(f" ✅ {_desc}: {_age:.1f}h alt")
# ── PH3: Output-Sanity – IP-Anzahlen plausibel? ──────────────────
# Baselines: erwartete Mindest-IP-Anzahlen bei gesunder Pipeline.
# Werte konservativ gewählt – unterschreiten deutet auf Feed-Ausfall
# oder Pipeline-Fehler hin, nicht auf normale Schwankungen.
_sanity_checks = [
# (datei, min_warn, min_crit, beschreibung)
("combined_threat_blacklist_ipv4.txt", 50_000, 10_000, "Combined Blacklist"),
("active_blacklist_ipv4.txt", 5_000, 1_000, "Active Blacklist"),
("blacklist_confidence40_ipv4.txt", 5_000, 1_000, "Confidence-40 Blacklist"),
("cve_exploit_ips.txt", 500, 100, "CVE Exploit IPs"),
("honeypot_ips.txt", 500, 100, "Honeypot IPs"),
("bot_detector_blacklist_ipv4.txt", 50, 10, "Bot-Detector"),
]
for _fpath, _min_warn, _min_crit, _desc in _sanity_checks:
_count = _count_ips(_fpath)
if _count is None:
continue # Datei nicht vorhanden – Frische-Check oben meldet das bereits
if _count < _min_crit:
prod_health.append({
"level": "CRITICAL", "check": f"Output-Sanity: {_desc}",
"detail": f"{_fpath}: nur {_count:,} IPs (CRITICAL <{_min_crit:,}) – massiver Datenverlust oder Pipeline-Fehler",
})
print(f" 🔴 {_desc}: {_count:,} IPs (<{_min_crit:,})")
elif _count < _min_warn:
prod_health.append({
"level": "WARN", "check": f"Output-Sanity: {_desc}",
"detail": f"{_fpath}: nur {_count:,} IPs (WARN <{_min_warn:,}) – möglicherweise Feed-Ausfälle",
})
print(f" 🟡 {_desc}: {_count:,} IPs (<{_min_warn:,})")
else:
print(f" ✅ {_desc}: {_count:,} IPs")
# ── PH4: Feed Health Auswirkungen ─────────────────────────────────
_fh_file = "feed_health_status.json"
if os.path.exists(_fh_file):
try:
with open(_fh_file, encoding="utf-8") as _fhf:
_fh = json.load(_fhf)
_fh_summary = _fh.get("summary", {})
_fh_errors = _fh_summary.get("error", 0)
_fh_warns = _fh_summary.get("warn", 0)
_fh_ok = _fh_summary.get("ok", 0)
_fh_total = _fh_ok + _fh_warns + _fh_errors
_failed = _fh.get("failed_feeds", [])
if _fh_errors >= 5:
prod_health.append({
"level": "CRITICAL", "check": "Feed-Ausfälle",
"detail": f"{_fh_errors} von {_fh_total} Feeds ausgefallen ({', '.join(_failed[:8])}{'…' if len(_failed) > 8 else ''}) – Blacklist-Qualität stark beeinträchtigt",
})
print(f" 🔴 Feed Health: {_fh_errors}/{_fh_total} ausgefallen")
elif _fh_errors >= 2:
prod_health.append({
"level": "WARN", "check": "Feed-Ausfälle",
"detail": f"{_fh_errors} von {_fh_total} Feeds ausgefallen: {', '.join(_failed[:5])}",
})
print(f" 🟡 Feed Health: {_fh_errors}/{_fh_total} ausgefallen")
else:
print(f" ✅ Feed Health: {_fh_ok}/{_fh_total} OK, {_fh_errors} Fehler, {_fh_warns} Warnungen")
except Exception as _ex:
print(f" ⏭ Feed Health: feed_health_status.json nicht lesbar ({_ex})")
else:
print(" ⏭ Feed Health: feed_health_status.json nicht vorhanden")
# ── PH5: Drift-Erkennung (Vergleich mit letztem Health-Status) ────
# workflow_health_status.json speichert die letzte Zusammenfassung.
# Vergleich der IP-Anzahlen mit den gespeicherten Werten erkennt
# ungewöhnliches Wachstum oder Schrumpfen.
_prev_status_file = "workflow_health_status.json"
if os.path.exists(_prev_status_file):
try:
with open(_prev_status_file, encoding="utf-8") as _psf:
_prev = json.load(_psf)
_prev_ip_counts = _prev.get("ip_counts", {})
if _prev_ip_counts:
_current_counts = {}
for _fpath, _, _, _desc in _sanity_checks:
_c = _count_ips(_fpath)
if _c is not None:
_current_counts[_fpath] = _c
for _fpath, _cur in _current_counts.items():
_prev_c = _prev_ip_counts.get(_fpath)
if _prev_c is None or _prev_c == 0:
continue
_change_pct = ((_cur - _prev_c) / _prev_c) * 100
if _change_pct < -30:
prod_health.append({
"level": "CRITICAL", "check": f"Drift: {_fpath}",
"detail": f"{_fpath}: {_prev_c:,} → {_cur:,} ({_change_pct:+.0f}%) – starker Rückgang seit letztem Check",
})
print(f" 🔴 Drift {_fpath}: {_change_pct:+.0f}% ({_prev_c:,} → {_cur:,})")
elif _change_pct < -15:
prod_health.append({
"level": "WARN", "check": f"Drift: {_fpath}",
"detail": f"{_fpath}: {_prev_c:,} → {_cur:,} ({_change_pct:+.0f}%) – deutlicher Rückgang",
})
print(f" 🟡 Drift {_fpath}: {_change_pct:+.0f}% ({_prev_c:,} → {_cur:,})")
elif _change_pct > 200:
# PH6: Extremes Wachstum (>+200%) ist fast immer ein Bug –
# kaputter Feed-Parser liest Kommentare als IPs, neuer Feed
# mit massivem False-Positive-Anteil, oder Dedup gescheitert.
prod_health.append({
"level": "CRITICAL", "check": f"Drift: {_fpath}",
"detail": f"{_fpath}: {_prev_c:,} → {_cur:,} ({_change_pct:+.0f}%) – extremes Wachstum, vermutlich Parser-/Dedup-Bug",
})
print(f" 🔴 Drift {_fpath}: {_change_pct:+.0f}% ({_prev_c:,} → {_cur:,})")
elif _change_pct > 50:
prod_health.append({
"level": "WARN", "check": f"Drift: {_fpath}",
"detail": f"{_fpath}: {_prev_c:,} → {_cur:,} ({_change_pct:+.0f}%) – ungewöhnliches Wachstum",
})
print(f" 🟡 Drift {_fpath}: {_change_pct:+.0f}% ({_prev_c:,} → {_cur:,})")
else:
print(f" ✅ Drift {_fpath}: {_change_pct:+.1f}% (stabil)")
else:
print(" ⏭ Drift: kein vorheriger IP-Count gespeichert (erster Lauf)")
except Exception:
print(" ⏭ Drift: workflow_health_status.json nicht lesbar")
else:
print(" ⏭ Drift: kein vorheriger Status vorhanden (erster Lauf)")
# ── PH7: Whitelist-Leakage am Output ──────────────────────────────
# Testet das ENDERGEBNIS, nicht den Code: Kommt eine geschützte IP
# tatsächlich in einer ausgelieferten Blacklist vor, ist das der
# schmerzhafteste Bug, den dieses System haben kann (Kunden blocken
# legitime Dienste). Single Source of Truth: whitelist.json.
_wl_file = ".github/workflows/whitelist.json"
if os.path.exists(_wl_file):
try:
with open(_wl_file, encoding="utf-8") as _wlf:
_wl_entries = json.load(_wlf).get("entries", [])
_wl_nets = []
_wl_singles = set()
for _e in _wl_entries:
try:
if "/" in _e:
_wl_nets.append(ipaddress.ip_network(_e, strict=False))
else:
_wl_singles.add(_e.strip())
except Exception as _suppressed:
print(f"WARN: suppressed Exception: {_suppressed}", file=sys.stderr)
def _ip_is_whitelisted(ip_str):
if ip_str in _wl_singles:
return True
try:
_addr = ipaddress.ip_address(ip_str.split("/")[0])
return any(_addr in _n for _n in _wl_nets)
except Exception:
return False
_leak_targets = [
"combined_threat_blacklist_ipv4.txt",
"active_blacklist_ipv4.txt",
"blacklist_confidence40_ipv4.txt",
]
for _lf in _leak_targets:
if not os.path.exists(_lf):
continue
_leaks = []
_scanned = 0
try:
with open(_lf, encoding="utf-8", errors="ignore") as _lfh:
for _line in _lfh:
_line = _line.strip()
if not _line or _line.startswith("#"):
continue
_scanned += 1
if _ip_is_whitelisted(_line):
_leaks.append(_line)
if len(_leaks) >= 10: # Stichprobe reicht – Beweis ist Beweis
break
except Exception as _le:
print(f" ⏭ Whitelist-Leak Scan {_lf}: Lese-Fehler ({_le})")
continue
if _leaks:
prod_health.append({
"level": "CRITICAL", "check": f"Whitelist-Leak: {_lf}",
"detail": f"{_lf} enthält whitelisted IPs: {', '.join(_leaks[:5])}{'…' if len(_leaks) > 5 else ''} – Filterung wirkungslos!",
})
print(f" 🔴 Whitelist-Leak {_lf}: {len(_leaks)}+ Treffer ({_leaks[0]} …)")
else:
print(f" ✅ Whitelist-Leak {_lf}: 0 Treffer ({_scanned:,} IPs gescannt)")
except Exception as _wle:
print(f" ⏭ Whitelist-Leak: whitelist.json nicht lesbar ({_wle})")
else:
print(f" ⏭ Whitelist-Leak: {_wl_file} nicht vorhanden")
# ── PH8: Geo-Tagger Aktualität ────────────────────────────────────
# FIX: Ursprünglich als Coverage-Quote (geo_count / combined_count)
# implementiert – das war falsch. Geo-Tagger läuft wöchentlich (So 07:45),
# Combined alle 3h. Die Geo-Datei spiegelt also einen Wochen-Snapshot,
# während Combined kontinuierlich wächst → Coverage erreicht NIE 100%
# und ist als Health-Signal unbrauchbar.
# Richtige Frage: Ist die Geo-Datei innerhalb des Wochen-Rhythmus frisch?
# Schwellen: WARN >9 Tage (1 Run verpasst), CRITICAL >16 Tage (2 Runs verpasst).
_geo_file = "blacklist_geo_enriched.json"
if os.path.exists(_geo_file):
_geo_age_h = _file_age_hours(_geo_file)
if _geo_age_h is not None:
_geo_age_d = _geo_age_h / 24
if _geo_age_d > 16:
prod_health.append({
"level": "CRITICAL", "check": "Geo-Tagger Aktualität",
"detail": f"blacklist_geo_enriched.json ist {_geo_age_d:.0f} Tage alt (CRITICAL >16d ≈ 2 wöchentliche Runs verpasst) – Geo-Tagger steht still",
})
print(f" 🔴 Geo-Tagger Alter: {_geo_age_d:.0f}d (CRITICAL >16d)")
elif _geo_age_d > 9:
prod_health.append({
"level": "WARN", "check": "Geo-Tagger Aktualität",
"detail": f"blacklist_geo_enriched.json ist {_geo_age_d:.0f} Tage alt (WARN >9d ≈ 1 wöchentlicher Run verpasst)",
})
print(f" 🟡 Geo-Tagger Alter: {_geo_age_d:.0f}d (WARN >9d)")
else:
print(f" ✅ Geo-Tagger Alter: {_geo_age_d:.1f}d")
else:
print(" ⏭ Geo-Tagger Aktualität: blacklist_geo_enriched.json nicht vorhanden (noch nie gelaufen)")
# ── PH: Zusammenfassung ───────────────────────────────────────────
_ph_critical = [p for p in prod_health if p["level"] == "CRITICAL"]
_ph_warn = [p for p in prod_health if p["level"] == "WARN"]
print(f"\nProduction Health: 🔴 {len(_ph_critical)} CRITICAL | 🟡 {len(_ph_warn)} WARN")
# Production Health Findings in die globalen errors/warnings einspeisen
for _ph in _ph_critical:
errors.append({"file": "Production Health", "check": _ph["check"], "detail": _ph["detail"]})
for _ph in _ph_warn:
warnings.append({"file": "Production Health", "check": _ph["check"], "detail": _ph["detail"]})
# ── Zusammenfassung ───────────────────────────────────────────────
# FIX: Findings aus Cross-Workflow- und Production-Health-Checks
# (HIGH_QUALITY↔SOURCES Drift, Untrusted Feed hq=True, whitelist.json,
# Cross-Workflow, Production Health, ...) landen direkt in den globalen
# errors/warnings-Listen, ohne in results[fname] eingetragen zu werden.
# Der alte Zaehler schaute nur auf `results` und meldete deshalb
# "19 OK | 0 Fehler", obwohl die Tabelle weiter unten Fehler auflistet.
# Neu: Pro Datei aus globalen Listen + results aggregieren.
_per_file_errs = {}
_per_file_warns = {}
for _e in errors:
_per_file_errs.setdefault(_e["file"], 0)
_per_file_errs[_e["file"]] += 1
for _w in warnings:
_per_file_warns.setdefault(_w["file"], 0)
_per_file_warns[_w["file"]] += 1
for _fn, _r in results.items():
if _r["errors"]:
_per_file_errs[_fn] = _per_file_errs.get(_fn, 0) + len(_r["errors"])
if _r["warnings"]:
_per_file_warns[_fn] = _per_file_warns.get(_fn, 0) + len(_r["warnings"])
_all_buckets = set(_per_file_errs) | set(_per_file_warns) | set(results)
err_count = sum(1 for b in _all_buckets if _per_file_errs.get(b, 0) > 0)
warn_count = sum(1 for b in _all_buckets if _per_file_errs.get(b, 0) == 0
and _per_file_warns.get(b, 0) > 0)
ok_count = sum(1 for b in _all_buckets if _per_file_errs.get(b, 0) == 0
and _per_file_warns.get(b, 0) == 0)
print(f"\nGesamt: ✅ {ok_count} OK | ⚠️ {warn_count} Warnungen | ❌ {err_count} Fehler")
# ── Report ────────────────────────────────────────────────────────
lines = []
lines.append("# Workflow Health Checker – Report")
lines.append(f"**Aktualisiert:** {now_str}\n")
lines.append(f"**Workflows:** {len(files)} | ✅ {ok_count} OK | "
f"⚠️ {warn_count} Warnung | ❌ {err_count} Fehler\n")
lines.append("---")
if errors:
lines.append("## ❌ Fehler (kritisch)\n")
lines.append("| Datei | Check | Detail |")
lines.append("|---|---|---|")
for e in errors:
lines.append(f"| `{e['file']}` | {e['check']} | {e['detail']} |")
lines.append("")
if warnings:
lines.append("## ⚠️ Warnungen\n")
lines.append("| Datei | Check | Detail |")
lines.append("|---|---|---|")
for w in warnings:
lines.append(f"| `{w['file']}` | {w['check']} | {w['detail']} |")
lines.append("")
# ── Production Health Dashboard ───────────────────────────────────
lines.append("## 🏥 Production Health\n")
lines.append(f"**Status:** 🔴 {len(_ph_critical)} CRITICAL | 🟡 {len(_ph_warn)} WARN\n")
if prod_health:
lines.append("| Level | Check | Detail |")
lines.append("|---|---|---|")
for _ph in sorted(prod_health, key=lambda x: (0 if x["level"] == "CRITICAL" else 1)):
_icon = "🔴" if _ph["level"] == "CRITICAL" else "🟡"
lines.append(f"| {_icon} {_ph['level']} | {_ph['check']} | {_ph['detail']} |")
else:
lines.append("*Alle Production Health Checks bestanden.*")
lines.append("")
lines.append("## Übersicht\n")
lines.append("| Workflow | Status | Fehler | Warnungen | Cron |")
lines.append("|---|---|---|---|---|")
for fname, r in results.items():
icon = "❌" if r["errors"] else ("⚠️" if r["warnings"] else "✅ OK")
crons = ", ".join(f"`{c}`" for c in r["cron"]) if r["cron"] else "–"
lines.append(f"| `{fname}` | {icon} | {len(r['errors'])} | {len(r['warnings'])} | {crons} |")
lines.append(f"\n---\n*Generiert: {now_str} | {len(files)} Workflow-Dateien geprüft*")
write_text_atomic("workflow_health_report.md", "\n".join(lines))
print("workflow_health_report.md geschrieben")
# IP-Counts für Drift-Erkennung beim nächsten Lauf sammeln
_current_ip_counts = {}
for _fpath, _, _, _ in _sanity_checks:
_c = _count_ips(_fpath)
if _c is not None:
_current_ip_counts[_fpath] = _c
# FIX: atomar schreiben statt open("...", "w") – diese Datei wird von
# anderen Workflows gelesen. Bei Runner-Kill waehrend dem Schreiben
# bleibt sonst eine halb-geschriebene JSON-Datei zurueck.
write_json_atomic("workflow_health_status.json", {
"updated": now_str,
"total": len(files),
"ok": ok_count,
"warn": warn_count,
"error": err_count,
"errors": errors,
"warnings": warnings,
"ip_counts": _current_ip_counts,
"production_health": {
"critical": len(_ph_critical),
"warn": len(_ph_warn),
"checks": prod_health,
},
}, indent=2)
if err_count > 0:
print(f"\nFEHLER: {err_count} kritische Probleme gefunden – siehe workflow_health_report.md")
else:
print("\nAlle Checks bestanden.")
EOF
- name: Commit
if: always() # FIX: Report auch bei Python-Fehler committen
run: |
git config user.name "github-actions[bot]"
git config user.email "github-actions[bot]@users.noreply.github.com"
git add workflow_health_report.md workflow_health_status.json
if git diff --staged --quiet; then
echo "Keine Änderungen"
else
git commit -m "Workflow Health Checker: $(date -u '+%Y-%m-%d %H:%M') UTC"
for attempt in 1 2 3 4 5; do
echo "Push-Versuch $attempt..."
git fetch origin ${GITHUB_REF_NAME}
git stash --include-untracked 2>/dev/null || true
if git rebase -X theirs origin/${GITHUB_REF_NAME}; then
git stash pop 2>/dev/null || true
if git push origin HEAD:${GITHUB_REF_NAME}; then
echo "Push erfolgreich (Versuch $attempt)"
exit 0
fi
else
git stash pop 2>/dev/null || true
git checkout --theirs workflow_health_report.md \
workflow_health_status.json 2>/dev/null || true
git add workflow_health_report.md \
workflow_health_status.json 2>/dev/null || true
GIT_EDITOR=true git rebase --continue 2>/dev/null || git rebase --skip
fi
sleep $((attempt * 3))
done
echo "FEHLER: Push nach 5 Versuchen fehlgeschlagen"
exit 1
fi