CVE-to-IP Mapper #67
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| name: CVE-to-IP Mapper | |
| on: | |
| schedule: | |
| - cron: '0 4 * * *' # täglich 04:00 UTC | |
| workflow_dispatch: | |
| env: | |
| FORCE_JAVASCRIPT_ACTIONS_TO_NODE24: true | |
| permissions: | |
| contents: write | |
| actions: read | |
| issues: none | |
| packages: none | |
| pull-requests: none | |
| security-events: none | |
| concurrency: | |
| group: ${{ github.workflow }} | |
| cancel-in-progress: false | |
| jobs: | |
| cve-mapper: | |
| runs-on: ubuntu-latest | |
| timeout-minutes: 15 | |
| steps: | |
| - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 | |
| with: | |
| persist-credentials: true | |
| - name: Map Active CVE Exploiters to IPs | |
| run: | | |
| python3 << 'EOF' | |
| import sys as _sys; _sys.path.insert(0, "scripts") | |
| from netshield_common import ( | |
| load_whitelist, load_fp_set, is_in_fp_set, | |
| is_valid_public_ipv4, is_valid_public_cidr, | |
| is_protected_entry, is_whitelisted, | |
| parse_entries, calculate_confidence, | |
| safe_get_date, parse_date, sort_ips, write_ip_list, | |
| write_text_atomic, | |
| fetch_url, check_local_feed_age, | |
| IPV4_RE, CIDR_RE, TIMESTAMP_RE, | |
| ) | |
| # Init: Whitelist + FP-Set laden | |
| load_whitelist() | |
| load_fp_set() | |
| import urllib.request | |
| import re | |
| import json | |
| import os | |
| import sys | |
| import time | |
| import ipaddress | |
| from datetime import datetime, timezone | |
| now = datetime.now(timezone.utc) | |
| now_str = now.strftime("%Y-%m-%d %H:%M UTC") | |
| # IPV4_RE / CIDR_RE werden aus netshield_common importiert (siehe oben). | |
| # Lokale Re-Definitionen entfernt – ungenutzt, parse_entries() macht | |
| # das Parsing zentral mit der strikten Regex. | |
| def fetch(url, timeout=30, retries=3): | |
| # FIX BUG-WF2-FETCHURL: siehe analoge Aenderung in | |
| # asn_reputation_scorer.yml. Kein SSRF/DNS-Pin/gzip-Schutz im | |
| # alten urlopen-Pfad; fetch_url liefert all das + saubere | |
| # Retry-Strategie fuer 429/5xx. | |
| return fetch_url(url, timeout=timeout, retries=retries, | |
| user_agent="NETSHIELD-CVEMapper/1.0") | |
| # is_valid_public_ipv4() → importiert aus netshield_common | |
| # is_valid_public_cidr() → importiert aus netshield_common | |
| # FP-Set wird über load_fp_set() (Zeile 48) geladen – netshield_common. | |
| # is_in_fp_set() → importiert aus netshield_common | |
| # parse_entries() → importiert aus netshield_common | |
| # ── CVE-Exploit-IP-Feeds (alle ohne API-Key) ────────────────────── | |
| # HINWEIS: Mehrere URLs hier sind BEWUSST identisch mit SOURCES in | |
| # update_combined_blacklist.yml (Feodo, C2-Tracker, URLhaus, IPC2s, ET). | |
| # Zweck: CVE-Mapper kategorisiert IPs nach Bedrohungstyp und schreibt | |
| # cve_exploit_ips.txt. Combined liest diese Datei als LOCAL_FEED ein, | |
| # wodurch die IPs +1 auf today_count erhalten. Das ist gewollt: | |
| # IPs die sowohl im CVE-Tracker als auch in der allgemeinen Feed-Pipeline | |
| # bestätigt werden, sind vertrauenswürdiger (höherer Confidence-Score). | |
| CVE_SOURCES = { | |
| # Feodo Tracker – C2-Server aktiver Malware-Kampagnen | |
| "Feodo C2 (Aggressive)": | |
| "https://feodotracker.abuse.ch/downloads/ipblocklist_aggressive.txt", | |
| "Feodo C2 (Recommended)": | |
| "https://feodotracker.abuse.ch/downloads/ipblocklist.txt", | |
| # ThreatFox – IOCs aktiver Exploits (abuse.ch) | |
| "ThreatFox IOC IPs": | |
| "https://raw.githubusercontent.com/elliotwutingfeng/ThreatFox-IOC-IPs/main/ips.txt", | |
| # FIX C2-MIGRATION: montysecurity/C2-Tracker archiviert → entfernt. | |
| # C2-Abdeckung bleibt über "ThreatFox IOC IPs" und "MISP Botnet C2". | |
| # Botnet C2 IPs | |
| "URLhaus Malware IPs": | |
| "https://urlhaus.abuse.ch/downloads/text/", | |
| # C2 Intel Feeds – IP_RE extrahiert erste Spalte (ioc) korrekt, | |
| # Port-Nummern haben keine 4-Oktet-Struktur → kein Fehlmatch | |
| "MISP Botnet C2": | |
| "https://raw.githubusercontent.com/drb-ra/C2IntelFeeds/master/feeds/IPC2s.csv", | |
| # Emerging Threats aktive Exploit-IPs | |
| "ET Compromised": | |
| "https://rules.emergingthreats.net/blockrules/compromised-ips.txt", | |
| # AbuseIPDB Score-100-Spiegel (30 Tage) – entfernt, da identisch mit | |
| # "abuseipdb_s100_30d" in update_combined_blacklist.yml SOURCES. | |
| # Doppeltes Einlesen würde den today_count in seen_db künstlich erhöhen | |
| # und den Confidence-Score von Turris/AbuseIPDB-IPs fälschlich aufblasen. | |
| } | |
| all_exploit_ips = set() | |
| source_stats = {} | |
| cve_ip_map = {} # CVE/Malware -> set of IPs | |
| for name, url in CVE_SOURCES.items(): | |
| print(f" Lade {name}...") | |
| text = fetch(url) | |
| if not text: | |
| source_stats[name] = 0 | |
| continue | |
| # parse_entries() erkennt alle gaengigen Formate: | |
| # plain IPv4, CIDR, ip:port, FortiGate, ipset, URLhaus, CSV, DROP, netset | |
| # FIX BUG-WL3: use_protected_check=True → Whitelist-IPs werden bereits | |
| # beim Parsen herausgefiltert (vorher: nur is_valid_public_ipv4). | |
| ips = parse_entries(text, use_protected_check=True) | |
| all_exploit_ips.update(ips) | |
| source_stats[name] = len(ips) | |
| # Tag zuweisen | |
| tag = "unknown" | |
| if "feodo" in url.lower(): | |
| tag = "Feodo/Emotet/QakBot C2" | |
| elif "c2-tracker" in url.lower() or "montysecurity" in url.lower(): | |
| tag = "Active C2 (Cobalt Strike / Metasploit)" | |
| elif "threatfox" in url.lower(): | |
| tag = "ThreatFox IOC" | |
| elif "compromised" in url.lower(): | |
| tag = "Compromised Host" | |
| elif "abuseipdb" in url.lower(): | |
| tag = "AbuseIPDB Score 100" | |
| elif "urlhaus" in url.lower(): | |
| tag = "URLhaus Malware C2" | |
| elif "drb-ra" in url.lower() or "c2intelfeeds" in url.lower(): | |
| tag = "MISP Botnet C2" | |
| if tag not in cve_ip_map: | |
| cve_ip_map[tag] = set() | |
| cve_ip_map[tag].update(ips) | |
| print(f" → {len(ips)} IPs ({tag})") | |
| print(f"\nGesamt einzigartige Exploit-IPs: {len(all_exploit_ips)}") | |
| # ── Bestehende Datei mergen ─────────────────────────────────────── | |
| CVE_FILE = "cve_exploit_ips.txt" | |
| existing = set() | |
| if os.path.exists(CVE_FILE): | |
| with open(CVE_FILE) as f: | |
| existing = {l.strip() for l in f if l.strip() and not l.startswith("#")} | |
| def sort_key(entry): | |
| # Sortiert IPs und CIDRs numerisch korrekt | |
| ip_part = entry.split('/')[0] | |
| try: | |
| return (tuple(int(o) for o in ip_part.split('.')), entry) | |
| except Exception: | |
| return ((999,), entry) | |
| # FIX BUG#3: FP- und Whitelist-Filter VOR der Delta-Berechnung anwenden. | |
| # Vorher wurde new_ips vor dem Filtern berechnet → Report-Statistik überhöht. | |
| # FP-Filter: IPs aus false_positives_set.json entfernen | |
| fp_removed = {ip for ip in all_exploit_ips if is_in_fp_set(ip)} | |
| if fp_removed: | |
| all_exploit_ips -= fp_removed | |
| for tag in cve_ip_map: | |
| cve_ip_map[tag] -= fp_removed | |
| print(f"FP-Filter: {len(fp_removed)} IPs entfernt") | |
| # FIX BUG-WL3: Whitelist-Filter – analog zum FP-Filter. | |
| # Defense-in-depth: parse_entries filtert bereits mit use_protected_check, | |
| # aber Altlasten aus bestehenden Feeds könnten durchrutschen. | |
| wl_removed = {ip for ip in all_exploit_ips if is_whitelisted(ip)} | |
| if wl_removed: | |
| all_exploit_ips -= wl_removed | |
| for tag in cve_ip_map: | |
| cve_ip_map[tag] -= wl_removed | |
| print(f"Whitelist-Filter: {len(wl_removed)} IPs entfernt") | |
| # Delta NACH allen Filtern berechnen → korrekte Report-Statistik | |
| new_ips = all_exploit_ips - existing | |
| final = sorted(all_exploit_ips, key=sort_key) | |
| # ── Report immer zuerst schreiben ──────────────────────────────── | |
| lines = [] | |
| lines.append("# CVE-to-IP Mapper – Report") | |
| lines.append(f"**Aktualisiert:** {now_str}\n") | |
| lines.append("---") | |
| lines.append("## Quellen & Treffer\n") | |
| lines.append("| Quelle | IPs |") | |
| lines.append("|---|---|") | |
| for name, count in source_stats.items(): | |
| icon = "✅" if count > 0 else "❌" | |
| lines.append(f"| {icon} {name} | {count} |") | |
| lines.append("") | |
| lines.append("---") | |
| lines.append("## Kategorien\n") | |
| lines.append("| Kategorie | IPs |") | |
| lines.append("|---|---|") | |
| for tag, ips in sorted(cve_ip_map.items(), key=lambda x: len(x[1]), reverse=True): | |
| lines.append(f"| {tag} | **{len(ips)}** |") | |
| lines.append("") | |
| lines.append("---") | |
| lines.append(f"**Gesamt:** {len(final)} IPs | **Neu:** +{len(new_ips)}") | |
| lines.append(f"\n---\n*Generiert: {now_str}*") | |
| write_text_atomic("cve_exploit_report.md", "\n".join(lines)) | |
| print("cve_exploit_report.md geschrieben") | |
| # ── Leerungsschutz ──────────────────────────────────────────────── | |
| # Wenn alle 8 CVE-Feeds fehlgeschlagen sind, liefert final 0 Einträge. | |
| # In diesem Fall bestehende Datei NICHT überschreiben. | |
| MIN_CVE_ENTRIES = 100 | |
| if len(final) < MIN_CVE_ENTRIES: | |
| msg = (f"Nur {len(final)} CVE-IPs (< {MIN_CVE_ENTRIES}) – " | |
| f"behalte bestehende {CVE_FILE}") | |
| print(f"::warning file=cve_to_ip_mapper.yml::{msg}") | |
| print(f"WARNUNG: {msg}") | |
| sys.exit(1) | |
| # FIX ATOMIC: write_text_atomic statt open("w") – bei Runner-Kill | |
| # bleibt die Datei komplett alt oder komplett neu, niemals halb | |
| # geschrieben. Content wird vorab als String aufgebaut, dann atomar | |
| # per tempfile + fsync + os.replace in einem Rutsch geschrieben. | |
| _cve_body_parts = [ | |
| f"# CVE Exploit & C2 IP Blocklist – NETSHIELD\n", | |
| f"# Aktualisiert: {now_str}\n", | |
| f"# Quellen: {len(CVE_SOURCES)}\n", | |
| f"# Gesamt Eintraege: {len(final)} | Neu: {len(new_ips)}\n\n", | |
| ] | |
| written = set() | |
| for tag, ips in sorted(cve_ip_map.items()): | |
| valid = sorted(ips - written, key=sort_key) | |
| if valid: | |
| _cve_body_parts.append(f"# ── {tag} ──\n") | |
| _cve_body_parts.append("\n".join(valid) + "\n\n") | |
| written.update(valid) | |
| write_text_atomic(CVE_FILE, "".join(_cve_body_parts)) | |
| print(f"cve_exploit_ips.txt: {len(final)} Eintraege | +{len(new_ips)} neu") | |
| # CVE-IPs werden NICHT direkt in combined_threat_blacklist_ipv4.txt geschrieben. | |
| # Der combined-blacklist-Workflow liest cve_exploit_ips.txt als Feed-Quelle ein. | |
| print(f"cve_exploit_ips.txt fertig – wird vom combined-blacklist-Workflow eingelesen.") | |
| EOF | |
| - name: Commit | |
| if: always() | |
| run: | | |
| git config user.name "github-actions[bot]" | |
| git config user.email "github-actions[bot]@users.noreply.github.com" | |
| git add cve_exploit_ips.txt cve_exploit_report.md | |
| if git diff --staged --quiet; then | |
| echo "Keine Änderungen" | |
| else | |
| git commit -m "CVE Exploit IPs aktualisiert [$(date -u '+%Y-%m-%d %H:%M') UTC]" | |
| for attempt in 1 2 3 4 5; do | |
| echo "Push-Versuch $attempt..." | |
| git fetch origin ${GITHUB_REF_NAME} | |
| git stash --include-untracked 2>/dev/null || true | |
| if git rebase -X theirs origin/${GITHUB_REF_NAME}; then | |
| git stash pop 2>/dev/null || true | |
| if git push origin HEAD:${GITHUB_REF_NAME}; then | |
| echo "Push erfolgreich (Versuch $attempt)" | |
| exit 0 | |
| fi | |
| else | |
| git stash pop 2>/dev/null || true | |
| git checkout --theirs cve_exploit_ips.txt cve_exploit_report.md 2>/dev/null || true | |
| git add cve_exploit_ips.txt cve_exploit_report.md 2>/dev/null || true | |
| GIT_EDITOR=true git rebase --continue 2>/dev/null || git rebase --skip | |
| fi | |
| sleep $((attempt * 3)) | |
| done | |
| echo "FEHLER: Push nach 5 Versuchen fehlgeschlagen" | |
| exit 1 | |
| fi |