diff --git a/misp_modules/modules/expansion/email_security_check.py b/misp_modules/modules/expansion/email_security_check.py new file mode 100644 index 00000000..ca332443 --- /dev/null +++ b/misp_modules/modules/expansion/email_security_check.py @@ -0,0 +1,158 @@ +import json + +try: + import dns.resolver + + resolver = dns.resolver.Resolver() + resolver.timeout = 2 + resolver.lifetime = 2 +except ImportError: + print("dnspython is missing, use 'pip install dnspython' to install it.") + +misperrors = {"error": "Error"} +mispattributes = {"input": ["domain", "hostname"], "output": ["text"]} +moduleinfo = { + "version": "0.1", + "author": "Mihai Catalin Teodosiu", + "description": "Check email security posture (SPF, DKIM, DMARC, MTA-STS) for a domain.", + "module-type": ["expansion", "hover"], + "name": "Email Security Check", + "logo": "", + "requirements": ["dnspython"], + "features": ( + "The module takes a domain or hostname attribute as input and queries DNS" + " for email security records: SPF (TXT), DMARC (_dmarc), DKIM (common selectors)," + " and MTA-STS (_mta-sts). Results include record content and a pass/fail assessment." + ), + "references": [ + "https://tools.ietf.org/html/rfc7208", + "https://tools.ietf.org/html/rfc7489", + ], + "input": "A domain or hostname attribute.", + "output": "Text containing email security posture assessment.", +} +moduleconfig = ["custom_resolver"] + +DKIM_SELECTORS = [ + "default", + "google", + "selector1", + "selector2", + "k1", + "mandrill", + "everlytickey1", + "everlytickey2", + "dkim", + "s1", + "s2", + "mailo", +] + + +def _query_txt(domain): + try: + answers = resolver.resolve(domain, "TXT") + return [str(rdata).strip('"') for rdata in answers] + except Exception: + return [] + + +def _check_spf(domain): + records = _query_txt(domain) + spf = [r for r in records if r.startswith("v=spf1")] + if spf: + return {"status": "FOUND", "record": spf[0]} + return {"status": "MISSING", "record": None} + + +def _check_dmarc(domain): + records = _query_txt(f"_dmarc.{domain}") + dmarc = [r for r in records if r.startswith("v=DMARC1")] + if dmarc: + policy = "none" + for part in dmarc[0].split(";"): + part = part.strip() + if part.startswith("p="): + policy = part[2:] + return {"status": "FOUND", "record": dmarc[0], "policy": policy} + return {"status": "MISSING", "record": None, "policy": None} + + +def _check_dkim(domain): + found = [] + for selector in DKIM_SELECTORS: + records = _query_txt(f"{selector}._domainkey.{domain}") + dkim = [r for r in records if "DKIM1" in r or "k=" in r or "p=" in r] + if dkim: + found.append({"selector": selector, "record": dkim[0]}) + return found + + +def _check_mta_sts(domain): + records = _query_txt(f"_mta-sts.{domain}") + sts = [r for r in records if r.startswith("v=STSv1")] + if sts: + return {"status": "FOUND", "record": sts[0]} + return {"status": "MISSING", "record": None} + + +def handler(q=False): + if q is False: + return False + + request = json.loads(q) + + domain = request.get("domain") or request.get("hostname") + if not domain: + misperrors["error"] = "A domain or hostname attribute is required." + return misperrors + + if request.get("config", {}).get("custom_resolver"): + resolver.nameservers = [request["config"]["custom_resolver"]] + + spf = _check_spf(domain) + dmarc = _check_dmarc(domain) + dkim = _check_dkim(domain) + mta_sts = _check_mta_sts(domain) + + lines = [f"=== Email Security Posture: {domain} ===", ""] + + lines.append(f"SPF: {spf['status']}") + if spf["record"]: + lines.append(f" Record: {spf['record']}") + + lines.append(f"\nDMARC: {dmarc['status']}") + if dmarc["record"]: + lines.append(f" Policy: {dmarc['policy']}") + lines.append(f" Record: {dmarc['record']}") + + if dkim: + lines.append(f"\nDKIM: FOUND ({len(dkim)} selector(s))") + for entry in dkim: + lines.append(f" Selector '{entry['selector']}': {entry['record'][:80]}...") + else: + lines.append("\nDKIM: NOT FOUND (tested common selectors)") + + lines.append(f"\nMTA-STS: {mta_sts['status']}") + if mta_sts["record"]: + lines.append(f" Record: {mta_sts['record']}") + + score = sum([ + 1 if spf["status"] == "FOUND" else 0, + 1 if dmarc["status"] == "FOUND" else 0, + 1 if dmarc.get("policy") in ("reject", "quarantine") else 0, + 1 if dkim else 0, + 1 if mta_sts["status"] == "FOUND" else 0, + ]) + lines.append(f"\nSecurity Score: {score}/5") + + result_text = "\n".join(lines) + return {"results": [{"types": ["text"], "values": result_text}]} + + +def introspection(): + return mispattributes + + +def version(): + return moduleinfo diff --git a/misp_modules/modules/expansion/ssh_fingerprint.py b/misp_modules/modules/expansion/ssh_fingerprint.py new file mode 100644 index 00000000..7c883fcd --- /dev/null +++ b/misp_modules/modules/expansion/ssh_fingerprint.py @@ -0,0 +1,140 @@ +import hashlib +import json +import socket + + +misperrors = {"error": "Error"} +mispattributes = {"input": ["ip-src", "ip-dst"], "output": ["text"]} +moduleinfo = { + "version": "0.1", + "author": "Mihai Catalin Teodosiu", + "description": "Grab SSH server key fingerprint from an IP address for verification or MitM detection.", + "module-type": ["expansion", "hover"], + "name": "SSH Fingerprint", + "logo": "", + "requirements": [], + "features": ( + "The module takes an IP address attribute as input, connects to port 22," + " performs the SSH protocol version exchange and key exchange init to extract" + " the server host key algorithms and SSH banner. Useful for detecting MitM" + " attacks or verifying server identity changes." + ), + "references": ["https://tools.ietf.org/html/rfc4253"], + "input": "An IP address attribute (ip-src or ip-dst).", + "output": "Text containing SSH banner and key exchange information.", +} +moduleconfig = ["port", "timeout"] + + +def _grab_ssh_banner(ip, port=22, timeout=5): + result = { + "banner": None, + "kex_algorithms": None, + "host_key_algorithms": None, + "error": None, + } + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(timeout) + sock.connect((ip, port)) + + banner = sock.recv(256).decode("utf-8", errors="replace").strip() + result["banner"] = banner + + sock.sendall(b"SSH-2.0-MISP_Fingerprint_Module\r\n") + + kex_data = sock.recv(4096) + if len(kex_data) > 21: + payload = kex_data[5:] + if payload and payload[0:1] == b"\x14": + payload = payload[16:] + if payload and payload[0:1] == b"\x14": + pass + + try: + msg_code = payload[0] + if msg_code == 20: + offset = 17 + if offset < len(payload): + kex_len = int.from_bytes( + payload[offset : offset + 4], "big" + ) + offset += 4 + if offset + kex_len <= len(payload): + kex_str = payload[offset : offset + kex_len].decode( + "utf-8", errors="replace" + ) + result["kex_algorithms"] = kex_str + offset += kex_len + + hk_len = int.from_bytes( + payload[offset : offset + 4], "big" + ) + offset += 4 + if offset + hk_len <= len(payload): + hk_str = payload[offset : offset + hk_len].decode( + "utf-8", errors="replace" + ) + result["host_key_algorithms"] = hk_str + except (IndexError, ValueError): + pass + + raw_hash = hashlib.sha256(kex_data).hexdigest() + result["kex_hash"] = raw_hash + + sock.close() + except socket.timeout: + result["error"] = "Connection timed out" + except ConnectionRefusedError: + result["error"] = "Connection refused (port closed)" + except Exception as e: + result["error"] = str(e) + + return result + + +def handler(q=False): + if q is False: + return False + + request = json.loads(q) + + ip = request.get("ip-src") or request.get("ip-dst") + if not ip: + misperrors["error"] = "An IP address attribute is required." + return misperrors + + config = request.get("config", {}) + port = int(config.get("port") or 22) + timeout = float(config.get("timeout") or 5) + + result = _grab_ssh_banner(ip, port, timeout) + + lines = [f"=== SSH Fingerprint: {ip}:{port} ===", ""] + + if result["error"]: + lines.append(f"Error: {result['error']}") + return {"results": [{"types": ["text"], "values": "\n".join(lines)}]} + + if result["banner"]: + lines.append(f"Banner: {result['banner']}") + + if result.get("host_key_algorithms"): + lines.append(f"\nHost Key Algorithms: {result['host_key_algorithms']}") + + if result.get("kex_algorithms"): + lines.append(f"KEX Algorithms: {result['kex_algorithms']}") + + if result.get("kex_hash"): + lines.append(f"\nKEX Init Hash (SHA256): {result['kex_hash']}") + lines.append(" (Compare this hash over time to detect server key changes)") + + return {"results": [{"types": ["text"], "values": "\n".join(lines)}]} + + +def introspection(): + return mispattributes + + +def version(): + return moduleinfo diff --git a/misp_modules/modules/expansion/tls_certificate_check.py b/misp_modules/modules/expansion/tls_certificate_check.py new file mode 100644 index 00000000..0fd3ae43 --- /dev/null +++ b/misp_modules/modules/expansion/tls_certificate_check.py @@ -0,0 +1,147 @@ +import json +import socket +import ssl +from datetime import datetime, timezone + + +misperrors = {"error": "Error"} +mispattributes = {"input": ["domain", "hostname"], "output": ["text"]} +moduleinfo = { + "version": "0.1", + "author": "Mihai Catalin Teodosiu", + "description": "Extract TLS certificate details from a domain: issuer, validity, SANs, chain info.", + "module-type": ["expansion", "hover"], + "name": "TLS Certificate Check", + "logo": "", + "requirements": [], + "features": ( + "The module takes a domain or hostname attribute as input, connects to port 443," + " performs a TLS handshake and extracts certificate details including issuer," + " subject, validity period, Subject Alternative Names, serial number, and" + " protocol version. No external API required — pure Python ssl module." + ), + "references": ["https://tools.ietf.org/html/rfc5246"], + "input": "A domain or hostname attribute.", + "output": "Text containing TLS certificate details and assessment.", +} +moduleconfig = ["port", "timeout"] + + +def _get_cert_info(domain, port=443, timeout=5): + result = { + "subject": None, + "issuer": None, + "serial": None, + "not_before": None, + "not_after": None, + "sans": [], + "protocol": None, + "error": None, + } + + try: + context = ssl.create_default_context() + context.check_hostname = True + context.verify_mode = ssl.CERT_REQUIRED + + with socket.create_connection((domain, port), timeout=timeout) as sock: + with context.wrap_socket(sock, server_hostname=domain) as tls: + cert = tls.getpeercert() + result["protocol"] = tls.version() + + subj = dict(x[0] for x in cert.get("subject", ())) + result["subject"] = subj.get("commonName", "N/A") + + issuer = dict(x[0] for x in cert.get("issuer", ())) + result["issuer"] = ( + f"{issuer.get('organizationName', 'N/A')}" + f" ({issuer.get('commonName', 'N/A')})" + ) + + result["serial"] = str(cert.get("serialNumber", "N/A")) + result["not_before"] = cert.get("notBefore", "N/A") + result["not_after"] = cert.get("notAfter", "N/A") + + for entry_type, entry_value in cert.get("subjectAltName", ()): + if entry_type == "DNS": + result["sans"].append(entry_value) + + except ssl.SSLCertVerificationError as e: + result["error"] = f"Certificate verification failed: {e}" + except ssl.SSLError as e: + result["error"] = f"SSL error: {e}" + except socket.timeout: + result["error"] = "Connection timed out" + except ConnectionRefusedError: + result["error"] = "Connection refused (port closed)" + except Exception as e: + result["error"] = str(e) + + return result + + +def _days_until_expiry(not_after_str): + try: + expiry = datetime.strptime(not_after_str, "%b %d %H:%M:%S %Y %Z") + expiry = expiry.replace(tzinfo=timezone.utc) + delta = expiry - datetime.now(timezone.utc) + return delta.days + except Exception: + return None + + +def handler(q=False): + if q is False: + return False + + request = json.loads(q) + + domain = request.get("domain") or request.get("hostname") + if not domain: + misperrors["error"] = "A domain or hostname attribute is required." + return misperrors + + config = request.get("config", {}) + port = int(config.get("port") or 443) + timeout = float(config.get("timeout") or 5) + + result = _get_cert_info(domain, port, timeout) + + lines = [f"=== TLS Certificate: {domain}:{port} ===", ""] + + if result["error"]: + lines.append(f"Error: {result['error']}") + return {"results": [{"types": ["text"], "values": "\n".join(lines)}]} + + lines.append(f"Subject: {result['subject']}") + lines.append(f"Issuer: {result['issuer']}") + lines.append(f"Serial: {result['serial']}") + lines.append(f"Protocol: {result['protocol']}") + lines.append(f"\nValid From: {result['not_before']}") + lines.append(f"Valid Until: {result['not_after']}") + + days_left = _days_until_expiry(result["not_after"]) + if days_left is not None: + if days_left < 0: + lines.append(f"STATUS: EXPIRED ({abs(days_left)} days ago)") + elif days_left < 30: + lines.append(f"STATUS: EXPIRING SOON ({days_left} days left)") + else: + lines.append(f"STATUS: VALID ({days_left} days remaining)") + + if result["sans"]: + lines.append(f"\nSubject Alternative Names ({len(result['sans'])}):") + for san in result["sans"][:20]: + lines.append(f" - {san}") + if len(result["sans"]) > 20: + lines.append(f" ... and {len(result['sans']) - 20} more") + + return {"results": [{"types": ["text"], "values": "\n".join(lines)}]} + + +def introspection(): + return mispattributes + + +def version(): + return moduleinfo