Skip to content

CVE-to-IP Mapper

CVE-to-IP Mapper #67

name: CVE-to-IP Mapper
on:
schedule:
- cron: '0 4 * * *' # täglich 04:00 UTC
workflow_dispatch:
env:
FORCE_JAVASCRIPT_ACTIONS_TO_NODE24: true
permissions:
contents: write
actions: read
issues: none
packages: none
pull-requests: none
security-events: none
concurrency:
group: ${{ github.workflow }}
cancel-in-progress: false
jobs:
cve-mapper:
runs-on: ubuntu-latest
timeout-minutes: 15
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
with:
persist-credentials: true
- name: Map Active CVE Exploiters to IPs
run: |
python3 << 'EOF'
import sys as _sys; _sys.path.insert(0, "scripts")
from netshield_common import (
load_whitelist, load_fp_set, is_in_fp_set,
is_valid_public_ipv4, is_valid_public_cidr,
is_protected_entry, is_whitelisted,
parse_entries, calculate_confidence,
safe_get_date, parse_date, sort_ips, write_ip_list,
write_text_atomic,
fetch_url, check_local_feed_age,
IPV4_RE, CIDR_RE, TIMESTAMP_RE,
)
# Init: Whitelist + FP-Set laden
load_whitelist()
load_fp_set()
import urllib.request
import re
import json
import os
import sys
import time
import ipaddress
from datetime import datetime, timezone
now = datetime.now(timezone.utc)
now_str = now.strftime("%Y-%m-%d %H:%M UTC")
# IPV4_RE / CIDR_RE werden aus netshield_common importiert (siehe oben).
# Lokale Re-Definitionen entfernt – ungenutzt, parse_entries() macht
# das Parsing zentral mit der strikten Regex.
def fetch(url, timeout=30, retries=3):
# FIX BUG-WF2-FETCHURL: siehe analoge Aenderung in
# asn_reputation_scorer.yml. Kein SSRF/DNS-Pin/gzip-Schutz im
# alten urlopen-Pfad; fetch_url liefert all das + saubere
# Retry-Strategie fuer 429/5xx.
return fetch_url(url, timeout=timeout, retries=retries,
user_agent="NETSHIELD-CVEMapper/1.0")
# is_valid_public_ipv4() → importiert aus netshield_common
# is_valid_public_cidr() → importiert aus netshield_common
# FP-Set wird über load_fp_set() (Zeile 48) geladen – netshield_common.
# is_in_fp_set() → importiert aus netshield_common
# parse_entries() → importiert aus netshield_common
# ── CVE-Exploit-IP-Feeds (alle ohne API-Key) ──────────────────────
# HINWEIS: Mehrere URLs hier sind BEWUSST identisch mit SOURCES in
# update_combined_blacklist.yml (Feodo, C2-Tracker, URLhaus, IPC2s, ET).
# Zweck: CVE-Mapper kategorisiert IPs nach Bedrohungstyp und schreibt
# cve_exploit_ips.txt. Combined liest diese Datei als LOCAL_FEED ein,
# wodurch die IPs +1 auf today_count erhalten. Das ist gewollt:
# IPs die sowohl im CVE-Tracker als auch in der allgemeinen Feed-Pipeline
# bestätigt werden, sind vertrauenswürdiger (höherer Confidence-Score).
CVE_SOURCES = {
# Feodo Tracker – C2-Server aktiver Malware-Kampagnen
"Feodo C2 (Aggressive)":
"https://feodotracker.abuse.ch/downloads/ipblocklist_aggressive.txt",
"Feodo C2 (Recommended)":
"https://feodotracker.abuse.ch/downloads/ipblocklist.txt",
# ThreatFox – IOCs aktiver Exploits (abuse.ch)
"ThreatFox IOC IPs":
"https://raw.githubusercontent.com/elliotwutingfeng/ThreatFox-IOC-IPs/main/ips.txt",
# FIX C2-MIGRATION: montysecurity/C2-Tracker archiviert → entfernt.
# C2-Abdeckung bleibt über "ThreatFox IOC IPs" und "MISP Botnet C2".
# Botnet C2 IPs
"URLhaus Malware IPs":
"https://urlhaus.abuse.ch/downloads/text/",
# C2 Intel Feeds – IP_RE extrahiert erste Spalte (ioc) korrekt,
# Port-Nummern haben keine 4-Oktet-Struktur → kein Fehlmatch
"MISP Botnet C2":
"https://raw.githubusercontent.com/drb-ra/C2IntelFeeds/master/feeds/IPC2s.csv",
# Emerging Threats aktive Exploit-IPs
"ET Compromised":
"https://rules.emergingthreats.net/blockrules/compromised-ips.txt",
# AbuseIPDB Score-100-Spiegel (30 Tage) – entfernt, da identisch mit
# "abuseipdb_s100_30d" in update_combined_blacklist.yml SOURCES.
# Doppeltes Einlesen würde den today_count in seen_db künstlich erhöhen
# und den Confidence-Score von Turris/AbuseIPDB-IPs fälschlich aufblasen.
}
all_exploit_ips = set()
source_stats = {}
cve_ip_map = {} # CVE/Malware -> set of IPs
for name, url in CVE_SOURCES.items():
print(f" Lade {name}...")
text = fetch(url)
if not text:
source_stats[name] = 0
continue
# parse_entries() erkennt alle gaengigen Formate:
# plain IPv4, CIDR, ip:port, FortiGate, ipset, URLhaus, CSV, DROP, netset
# FIX BUG-WL3: use_protected_check=True → Whitelist-IPs werden bereits
# beim Parsen herausgefiltert (vorher: nur is_valid_public_ipv4).
ips = parse_entries(text, use_protected_check=True)
all_exploit_ips.update(ips)
source_stats[name] = len(ips)
# Tag zuweisen
tag = "unknown"
if "feodo" in url.lower():
tag = "Feodo/Emotet/QakBot C2"
elif "c2-tracker" in url.lower() or "montysecurity" in url.lower():
tag = "Active C2 (Cobalt Strike / Metasploit)"
elif "threatfox" in url.lower():
tag = "ThreatFox IOC"
elif "compromised" in url.lower():
tag = "Compromised Host"
elif "abuseipdb" in url.lower():
tag = "AbuseIPDB Score 100"
elif "urlhaus" in url.lower():
tag = "URLhaus Malware C2"
elif "drb-ra" in url.lower() or "c2intelfeeds" in url.lower():
tag = "MISP Botnet C2"
if tag not in cve_ip_map:
cve_ip_map[tag] = set()
cve_ip_map[tag].update(ips)
print(f" → {len(ips)} IPs ({tag})")
print(f"\nGesamt einzigartige Exploit-IPs: {len(all_exploit_ips)}")
# ── Bestehende Datei mergen ───────────────────────────────────────
CVE_FILE = "cve_exploit_ips.txt"
existing = set()
if os.path.exists(CVE_FILE):
with open(CVE_FILE) as f:
existing = {l.strip() for l in f if l.strip() and not l.startswith("#")}
def sort_key(entry):
# Sortiert IPs und CIDRs numerisch korrekt
ip_part = entry.split('/')[0]
try:
return (tuple(int(o) for o in ip_part.split('.')), entry)
except Exception:
return ((999,), entry)
# FIX BUG#3: FP- und Whitelist-Filter VOR der Delta-Berechnung anwenden.
# Vorher wurde new_ips vor dem Filtern berechnet → Report-Statistik überhöht.
# FP-Filter: IPs aus false_positives_set.json entfernen
fp_removed = {ip for ip in all_exploit_ips if is_in_fp_set(ip)}
if fp_removed:
all_exploit_ips -= fp_removed
for tag in cve_ip_map:
cve_ip_map[tag] -= fp_removed
print(f"FP-Filter: {len(fp_removed)} IPs entfernt")
# FIX BUG-WL3: Whitelist-Filter – analog zum FP-Filter.
# Defense-in-depth: parse_entries filtert bereits mit use_protected_check,
# aber Altlasten aus bestehenden Feeds könnten durchrutschen.
wl_removed = {ip for ip in all_exploit_ips if is_whitelisted(ip)}
if wl_removed:
all_exploit_ips -= wl_removed
for tag in cve_ip_map:
cve_ip_map[tag] -= wl_removed
print(f"Whitelist-Filter: {len(wl_removed)} IPs entfernt")
# Delta NACH allen Filtern berechnen → korrekte Report-Statistik
new_ips = all_exploit_ips - existing
final = sorted(all_exploit_ips, key=sort_key)
# ── Report immer zuerst schreiben ────────────────────────────────
lines = []
lines.append("# CVE-to-IP Mapper – Report")
lines.append(f"**Aktualisiert:** {now_str}\n")
lines.append("---")
lines.append("## Quellen & Treffer\n")
lines.append("| Quelle | IPs |")
lines.append("|---|---|")
for name, count in source_stats.items():
icon = "✅" if count > 0 else "❌"
lines.append(f"| {icon} {name} | {count} |")
lines.append("")
lines.append("---")
lines.append("## Kategorien\n")
lines.append("| Kategorie | IPs |")
lines.append("|---|---|")
for tag, ips in sorted(cve_ip_map.items(), key=lambda x: len(x[1]), reverse=True):
lines.append(f"| {tag} | **{len(ips)}** |")
lines.append("")
lines.append("---")
lines.append(f"**Gesamt:** {len(final)} IPs | **Neu:** +{len(new_ips)}")
lines.append(f"\n---\n*Generiert: {now_str}*")
write_text_atomic("cve_exploit_report.md", "\n".join(lines))
print("cve_exploit_report.md geschrieben")
# ── Leerungsschutz ────────────────────────────────────────────────
# Wenn alle 8 CVE-Feeds fehlgeschlagen sind, liefert final 0 Einträge.
# In diesem Fall bestehende Datei NICHT überschreiben.
MIN_CVE_ENTRIES = 100
if len(final) < MIN_CVE_ENTRIES:
msg = (f"Nur {len(final)} CVE-IPs (< {MIN_CVE_ENTRIES}) – "
f"behalte bestehende {CVE_FILE}")
print(f"::warning file=cve_to_ip_mapper.yml::{msg}")
print(f"WARNUNG: {msg}")
sys.exit(1)
# FIX ATOMIC: write_text_atomic statt open("w") – bei Runner-Kill
# bleibt die Datei komplett alt oder komplett neu, niemals halb
# geschrieben. Content wird vorab als String aufgebaut, dann atomar
# per tempfile + fsync + os.replace in einem Rutsch geschrieben.
_cve_body_parts = [
f"# CVE Exploit & C2 IP Blocklist – NETSHIELD\n",
f"# Aktualisiert: {now_str}\n",
f"# Quellen: {len(CVE_SOURCES)}\n",
f"# Gesamt Eintraege: {len(final)} | Neu: {len(new_ips)}\n\n",
]
written = set()
for tag, ips in sorted(cve_ip_map.items()):
valid = sorted(ips - written, key=sort_key)
if valid:
_cve_body_parts.append(f"# ── {tag} ──\n")
_cve_body_parts.append("\n".join(valid) + "\n\n")
written.update(valid)
write_text_atomic(CVE_FILE, "".join(_cve_body_parts))
print(f"cve_exploit_ips.txt: {len(final)} Eintraege | +{len(new_ips)} neu")
# CVE-IPs werden NICHT direkt in combined_threat_blacklist_ipv4.txt geschrieben.
# Der combined-blacklist-Workflow liest cve_exploit_ips.txt als Feed-Quelle ein.
print(f"cve_exploit_ips.txt fertig – wird vom combined-blacklist-Workflow eingelesen.")
EOF
- name: Commit
if: always()
run: |
git config user.name "github-actions[bot]"
git config user.email "github-actions[bot]@users.noreply.github.com"
git add cve_exploit_ips.txt cve_exploit_report.md
if git diff --staged --quiet; then
echo "Keine Änderungen"
else
git commit -m "CVE Exploit IPs aktualisiert [$(date -u '+%Y-%m-%d %H:%M') UTC]"
for attempt in 1 2 3 4 5; do
echo "Push-Versuch $attempt..."
git fetch origin ${GITHUB_REF_NAME}
git stash --include-untracked 2>/dev/null || true
if git rebase -X theirs origin/${GITHUB_REF_NAME}; then
git stash pop 2>/dev/null || true
if git push origin HEAD:${GITHUB_REF_NAME}; then
echo "Push erfolgreich (Versuch $attempt)"
exit 0
fi
else
git stash pop 2>/dev/null || true
git checkout --theirs cve_exploit_ips.txt cve_exploit_report.md 2>/dev/null || true
git add cve_exploit_ips.txt cve_exploit_report.md 2>/dev/null || true
GIT_EDITOR=true git rebase --continue 2>/dev/null || git rebase --skip
fi
sleep $((attempt * 3))
done
echo "FEHLER: Push nach 5 Versuchen fehlgeschlagen"
exit 1
fi