diff --git a/misp_modules/modules/expansion/email_security_check.py b/misp_modules/modules/expansion/email_security_check.py new file mode 100644 index 00000000..0243c37e --- /dev/null +++ b/misp_modules/modules/expansion/email_security_check.py @@ -0,0 +1,224 @@ +import json + +try: + import dns.resolver + + resolver = dns.resolver.Resolver() + resolver.timeout = 2 + resolver.lifetime = 2 +except ImportError: + print("dnspython is missing, use 'pip install dnspython' to install it.") + +from pymisp import MISPAttribute, MISPEvent, MISPObject + +misperrors = {"error": "Error"} +mispattributes = {"input": ["domain", "hostname"], "format": "misp_standard"} +moduleinfo = { + "version": "0.2", + "author": "Mihai Saveanu", + "description": "Check email security posture (SPF, DKIM, DMARC, MTA-STS) for a domain.", + "module-type": ["expansion", "hover"], + "name": "Email Security Check", + "logo": "", + "requirements": ["dnspython"], + "features": ( + "The module takes a domain or hostname attribute as input and queries DNS" + " for email security records: SPF (TXT), DMARC (_dmarc), DKIM (common selectors)," + " and MTA-STS (_mta-sts). Returns structured MISP attributes with a domain-ip" + " object linking the findings to the queried domain." + ), + "references": [ + "https://tools.ietf.org/html/rfc7208", + "https://tools.ietf.org/html/rfc7489", + ], + "input": "A domain or hostname attribute.", + "output": "Domain-ip MISP object with email security assessment attributes.", +} +moduleconfig = ["custom_resolver"] + +DKIM_SELECTORS = [ + "default", + "google", + "selector1", + "selector2", + "k1", + "mandrill", + "everlytickey1", + "everlytickey2", + "dkim", + "s1", + "s2", + "mailo", +] + + +def _query_txt(domain): + try: + answers = resolver.resolve(domain, "TXT") + return [str(rdata).strip('"') for rdata in answers] + except Exception: + return [] + + +def _check_spf(domain): + records = _query_txt(domain) + spf = [r for r in records if r.startswith("v=spf1")] + if spf: + return {"status": "FOUND", "record": spf[0]} + return {"status": "MISSING", "record": None} + + +def _check_dmarc(domain): + records = _query_txt(f"_dmarc.{domain}") + dmarc = [r for r in records if r.startswith("v=DMARC1")] + if dmarc: + policy = "none" + for part in dmarc[0].split(";"): + part = part.strip() + if part.startswith("p="): + policy = part[2:] + return {"status": "FOUND", "record": dmarc[0], "policy": policy} + return {"status": "MISSING", "record": None, "policy": None} + + +def _check_dkim(domain): + found = [] + for selector in DKIM_SELECTORS: + records = _query_txt(f"{selector}._domainkey.{domain}") + dkim = [r for r in records if "DKIM1" in r or "k=" in r or "p=" in r] + if dkim: + found.append({"selector": selector, "record": dkim[0]}) + return found + + +def _check_mta_sts(domain): + records = _query_txt(f"_mta-sts.{domain}") + sts = [r for r in records if r.startswith("v=STSv1")] + if sts: + return {"status": "FOUND", "record": sts[0]} + return {"status": "MISSING", "record": None} + + +def handler(q=False): + if q is False: + return False + + request = json.loads(q) + + if not request.get("attribute") or not request["attribute"].get("type"): + return {"error": "Missing or invalid attribute."} + + attribute = request["attribute"] + if attribute["type"] not in mispattributes["input"]: + return {"error": f"Unsupported attribute type: {attribute['type']}"} + + domain = attribute["value"] + + if request.get("config", {}).get("custom_resolver"): + resolver.nameservers = [request["config"]["custom_resolver"]] + + spf = _check_spf(domain) + dmarc = _check_dmarc(domain) + dkim = _check_dkim(domain) + mta_sts = _check_mta_sts(domain) + + event = MISPEvent() + initial_attribute = MISPAttribute() + initial_attribute.from_dict(**attribute) + event.add_attribute(**initial_attribute) + + domain_obj = MISPObject("domain-ip") + domain_obj.add_attribute("domain", **{"type": "domain", "value": domain}) + + score = 0 + + if spf["status"] == "FOUND": + score += 1 + domain_obj.add_attribute( + "text", + **{"type": "text", "value": f"SPF: {spf['record']}", "comment": "SPF record", "disable_correlation": True}, + ) + else: + domain_obj.add_attribute( + "text", + **{"type": "text", "value": "SPF: MISSING", "comment": "SPF record", "disable_correlation": True}, + ) + + if dmarc["status"] == "FOUND": + score += 1 + if dmarc["policy"] in ("reject", "quarantine"): + score += 1 + domain_obj.add_attribute( + "text", + **{ + "type": "text", + "value": f"DMARC: {dmarc['policy']} — {dmarc['record']}", + "comment": "DMARC record and policy", + "disable_correlation": True, + }, + ) + else: + domain_obj.add_attribute( + "text", + **{"type": "text", "value": "DMARC: MISSING", "comment": "DMARC record", "disable_correlation": True}, + ) + + if dkim: + score += 1 + selectors = ", ".join(d["selector"] for d in dkim) + domain_obj.add_attribute( + "text", + **{ + "type": "text", + "value": f"DKIM: FOUND ({len(dkim)} selector(s): {selectors})", + "comment": "DKIM selectors found", + "disable_correlation": True, + }, + ) + else: + domain_obj.add_attribute( + "text", + **{ + "type": "text", + "value": "DKIM: NOT FOUND (tested common selectors)", + "comment": "DKIM check", + "disable_correlation": True, + }, + ) + + if mta_sts["status"] == "FOUND": + score += 1 + domain_obj.add_attribute( + "text", + **{"type": "text", "value": f"MTA-STS: {mta_sts['record']}", "comment": "MTA-STS record", "disable_correlation": True}, + ) + else: + domain_obj.add_attribute( + "text", + **{"type": "text", "value": "MTA-STS: MISSING", "comment": "MTA-STS record", "disable_correlation": True}, + ) + + domain_obj.add_attribute( + "text", + **{ + "type": "text", + "value": f"Email Security Score: {score}/5", + "comment": "Overall email security posture score", + "disable_correlation": True, + }, + ) + + domain_obj.add_reference(initial_attribute.uuid, "related-to") + event.add_object(**domain_obj) + + ev = json.loads(event.to_json()) + results = {key: ev[key] for key in ("Attribute", "Object") if key in ev and ev[key]} + return {"results": results} + + +def introspection(): + return mispattributes + + +def version(): + return moduleinfo diff --git a/misp_modules/modules/expansion/ssh_fingerprint.py b/misp_modules/modules/expansion/ssh_fingerprint.py new file mode 100644 index 00000000..8d86cc4c --- /dev/null +++ b/misp_modules/modules/expansion/ssh_fingerprint.py @@ -0,0 +1,156 @@ +import hashlib +import json +import socket + +from pymisp import MISPAttribute, MISPEvent, MISPObject + +misperrors = {"error": "Error"} +mispattributes = {"input": ["ip-src", "ip-dst"], "format": "misp_standard"} +moduleinfo = { + "version": "0.2", + "author": "Mihai Saveanu", + "description": "Grab SSH server fingerprint from an IP and return as passive-ssh MISP object.", + "module-type": ["expansion", "hover"], + "name": "SSH Fingerprint", + "logo": "", + "requirements": [], + "features": ( + "The module takes an IP address attribute as input, connects to port 22," + " performs the SSH protocol version exchange and key exchange init to extract" + " the server host key algorithms and SSH banner. Returns a structured passive-ssh" + " MISP object. Useful for detecting MitM attacks or verifying server identity." + ), + "references": ["https://tools.ietf.org/html/rfc4253"], + "input": "An IP address attribute (ip-src or ip-dst).", + "output": "passive-ssh MISP object with banner and fingerprint.", +} +moduleconfig = ["port", "timeout"] + + +def _grab_ssh_banner(ip, port=22, timeout=5): + result = { + "banner": None, + "kex_algorithms": None, + "host_key_algorithms": None, + "kex_hash": None, + "error": None, + } + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(timeout) + sock.connect((ip, port)) + + banner = sock.recv(256).decode("utf-8", errors="replace").strip() + result["banner"] = banner + + sock.sendall(b"SSH-2.0-MISP_Fingerprint_Module\r\n") + + kex_data = sock.recv(4096) + if len(kex_data) > 21: + payload = kex_data[5:] + + try: + msg_code = payload[0] + if msg_code == 20: + offset = 17 + if offset < len(payload): + kex_len = int.from_bytes( + payload[offset : offset + 4], "big" + ) + offset += 4 + if offset + kex_len <= len(payload): + kex_str = payload[offset : offset + kex_len].decode( + "utf-8", errors="replace" + ) + result["kex_algorithms"] = kex_str + offset += kex_len + + hk_len = int.from_bytes( + payload[offset : offset + 4], "big" + ) + offset += 4 + if offset + hk_len <= len(payload): + hk_str = payload[offset : offset + hk_len].decode( + "utf-8", errors="replace" + ) + result["host_key_algorithms"] = hk_str + except (IndexError, ValueError): + pass + + raw_hash = hashlib.sha256(kex_data).hexdigest() + result["kex_hash"] = raw_hash + + sock.close() + except socket.timeout: + result["error"] = "Connection timed out" + except ConnectionRefusedError: + result["error"] = "Connection refused (port closed)" + except Exception as e: + result["error"] = str(e) + + return result + + +def handler(q=False): + if q is False: + return False + + request = json.loads(q) + + if not request.get("attribute") or not request["attribute"].get("type"): + return {"error": "Missing or invalid attribute."} + + attribute = request["attribute"] + if attribute["type"] not in mispattributes["input"]: + return {"error": f"Unsupported attribute type: {attribute['type']}"} + + ip = attribute["value"] + config = request.get("config", {}) + port = int(config.get("port") or 22) + timeout = float(config.get("timeout") or 5) + + result = _grab_ssh_banner(ip, port, timeout) + + event = MISPEvent() + initial_attribute = MISPAttribute() + initial_attribute.from_dict(**attribute) + event.add_attribute(**initial_attribute) + + if result["error"]: + event.add_attribute( + "text", + f"SSH error for {ip}: {result['error']}", + comment="SSH Fingerprint - error", + ) + ev = json.loads(event.to_json()) + results = {key: ev[key] for key in ("Attribute", "Object") if key in ev and ev[key]} + return {"results": results} + + ssh = MISPObject("passive-ssh") + + ssh.add_attribute("host", **{"type": "ip-dst", "value": ip}) + ssh.add_attribute("port", **{"type": "port", "value": port}) + + if result["banner"]: + ssh.add_attribute("banner", **{"type": "text", "value": result["banner"]}) + + if result["kex_hash"]: + ssh.add_attribute( + "fingerprint", + **{"type": "ssh-fingerprint", "value": result["kex_hash"]}, + ) + + ssh.add_reference(initial_attribute.uuid, "related-to") + event.add_object(**ssh) + + ev = json.loads(event.to_json()) + results = {key: ev[key] for key in ("Attribute", "Object") if key in ev and ev[key]} + return {"results": results} + + +def introspection(): + return mispattributes + + +def version(): + return moduleinfo diff --git a/misp_modules/modules/expansion/tls_certificate_check.py b/misp_modules/modules/expansion/tls_certificate_check.py new file mode 100644 index 00000000..2ab98b83 --- /dev/null +++ b/misp_modules/modules/expansion/tls_certificate_check.py @@ -0,0 +1,185 @@ +import json +import socket +import ssl +from datetime import datetime, timezone + +from pymisp import MISPAttribute, MISPEvent, MISPObject + +misperrors = {"error": "Error"} +mispattributes = {"input": ["domain", "hostname"], "format": "misp_standard"} +moduleinfo = { + "version": "0.2", + "author": "Mihai Saveanu", + "description": "Extract TLS certificate details from a domain and return as x509 MISP object.", + "module-type": ["expansion", "hover"], + "name": "TLS Certificate Check", + "logo": "", + "requirements": [], + "features": ( + "The module takes a domain or hostname attribute as input, connects to port 443," + " performs a TLS handshake and extracts certificate details. Returns a structured" + " x509 MISP object with subject, issuer, validity, SANs, serial number, and" + " protocol version. No external API required." + ), + "references": ["https://tools.ietf.org/html/rfc5246"], + "input": "A domain or hostname attribute.", + "output": "x509 MISP object with certificate details.", +} +moduleconfig = ["port", "timeout"] + + +def _get_cert_info(domain, port=443, timeout=5): + result = { + "subject": None, + "issuer": None, + "issuer_org": None, + "issuer_cn": None, + "serial": None, + "not_before": None, + "not_after": None, + "sans": [], + "protocol": None, + "error": None, + } + + try: + context = ssl.create_default_context() + context.check_hostname = True + context.verify_mode = ssl.CERT_REQUIRED + + with socket.create_connection((domain, port), timeout=timeout) as sock: + with context.wrap_socket(sock, server_hostname=domain) as tls: + cert = tls.getpeercert() + result["protocol"] = tls.version() + + subj = dict(x[0] for x in cert.get("subject", ())) + result["subject"] = subj.get("commonName", "N/A") + + issuer = dict(x[0] for x in cert.get("issuer", ())) + result["issuer_org"] = issuer.get("organizationName", "") + result["issuer_cn"] = issuer.get("commonName", "") + result["issuer"] = ( + f"{result['issuer_org']} ({result['issuer_cn']})" + if result["issuer_org"] + else result["issuer_cn"] + ) + + result["serial"] = str(cert.get("serialNumber", "N/A")) + result["not_before"] = cert.get("notBefore", "") + result["not_after"] = cert.get("notAfter", "") + + for entry_type, entry_value in cert.get("subjectAltName", ()): + if entry_type == "DNS": + result["sans"].append(entry_value) + + except ssl.SSLCertVerificationError as e: + result["error"] = f"Certificate verification failed: {e}" + except ssl.SSLError as e: + result["error"] = f"SSL error: {e}" + except socket.timeout: + result["error"] = "Connection timed out" + except ConnectionRefusedError: + result["error"] = "Connection refused (port closed)" + except Exception as e: + result["error"] = str(e) + + return result + + +def _parse_datetime(date_str): + try: + dt = datetime.strptime(date_str, "%b %d %H:%M:%S %Y %Z") + return dt.replace(tzinfo=timezone.utc).strftime("%Y-%m-%dT%H:%M:%S+00:00") + except Exception: + return None + + +def _days_until_expiry(not_after_str): + try: + expiry = datetime.strptime(not_after_str, "%b %d %H:%M:%S %Y %Z") + expiry = expiry.replace(tzinfo=timezone.utc) + delta = expiry - datetime.now(timezone.utc) + return delta.days + except Exception: + return None + + +def handler(q=False): + if q is False: + return False + + request = json.loads(q) + + if not request.get("attribute") or not request["attribute"].get("type"): + return {"error": "Missing or invalid attribute."} + + attribute = request["attribute"] + if attribute["type"] not in mispattributes["input"]: + return {"error": f"Unsupported attribute type: {attribute['type']}"} + + domain = attribute["value"] + config = request.get("config", {}) + port = int(config.get("port") or 443) + timeout = float(config.get("timeout") or 5) + + result = _get_cert_info(domain, port, timeout) + + event = MISPEvent() + initial_attribute = MISPAttribute() + initial_attribute.from_dict(**attribute) + event.add_attribute(**initial_attribute) + + if result["error"]: + event.add_attribute( + "text", + f"TLS error for {domain}: {result['error']}", + comment="TLS Certificate Check - error", + ) + ev = json.loads(event.to_json()) + results = {key: ev[key] for key in ("Attribute", "Object") if key in ev and ev[key]} + return {"results": results} + + x509 = MISPObject("x509") + + x509.add_attribute("serial-number", **{"type": "text", "value": result["serial"]}) + x509.add_attribute("issuer", **{"type": "text", "value": result["issuer"], "disable_correlation": True}) + x509.add_attribute("subject", **{"type": "text", "value": result["subject"]}) + + if result["protocol"]: + x509.add_attribute("version", **{"type": "text", "value": result["protocol"], "disable_correlation": True}) + + not_before_iso = _parse_datetime(result["not_before"]) + not_after_iso = _parse_datetime(result["not_after"]) + + if not_before_iso: + x509.add_attribute("validity-not-before", **{"type": "datetime", "value": not_before_iso, "disable_correlation": True}) + if not_after_iso: + x509.add_attribute("validity-not-after", **{"type": "datetime", "value": not_after_iso, "disable_correlation": True}) + + for san in result["sans"][:50]: + x509.add_attribute("dns_names", **{"type": "hostname", "value": san}) + + days_left = _days_until_expiry(result["not_after"]) + if days_left is not None: + if days_left < 0: + status = f"EXPIRED ({abs(days_left)} days ago)" + elif days_left < 30: + status = f"EXPIRING SOON ({days_left} days left)" + else: + status = f"VALID ({days_left} days remaining)" + x509.add_attribute("text", **{"type": "text", "value": status, "disable_correlation": True}) + + x509.add_reference(initial_attribute.uuid, "related-to") + event.add_object(**x509) + + ev = json.loads(event.to_json()) + results = {key: ev[key] for key in ("Attribute", "Object") if key in ev and ev[key]} + return {"results": results} + + +def introspection(): + return mispattributes + + +def version(): + return moduleinfo