Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 158 additions & 0 deletions misp_modules/modules/expansion/email_security_check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
import json

try:
import dns.resolver

resolver = dns.resolver.Resolver()
resolver.timeout = 2
resolver.lifetime = 2
except ImportError:
print("dnspython is missing, use 'pip install dnspython' to install it.")

misperrors = {"error": "Error"}
mispattributes = {"input": ["domain", "hostname"], "output": ["text"]}
moduleinfo = {
"version": "0.1",
"author": "Mihai Catalin Teodosiu",
"description": "Check email security posture (SPF, DKIM, DMARC, MTA-STS) for a domain.",
"module-type": ["expansion", "hover"],
"name": "Email Security Check",
"logo": "",
"requirements": ["dnspython"],
"features": (
"The module takes a domain or hostname attribute as input and queries DNS"
" for email security records: SPF (TXT), DMARC (_dmarc), DKIM (common selectors),"
" and MTA-STS (_mta-sts). Results include record content and a pass/fail assessment."
),
"references": [
"https://tools.ietf.org/html/rfc7208",
"https://tools.ietf.org/html/rfc7489",
],
"input": "A domain or hostname attribute.",
"output": "Text containing email security posture assessment.",
}
moduleconfig = ["custom_resolver"]

DKIM_SELECTORS = [
"default",
"google",
"selector1",
"selector2",
"k1",
"mandrill",
"everlytickey1",
"everlytickey2",
"dkim",
"s1",
"s2",
"mailo",
]


def _query_txt(domain):
try:
answers = resolver.resolve(domain, "TXT")
return [str(rdata).strip('"') for rdata in answers]
except Exception:
return []


def _check_spf(domain):
records = _query_txt(domain)
spf = [r for r in records if r.startswith("v=spf1")]
if spf:
return {"status": "FOUND", "record": spf[0]}
return {"status": "MISSING", "record": None}


def _check_dmarc(domain):
records = _query_txt(f"_dmarc.{domain}")
dmarc = [r for r in records if r.startswith("v=DMARC1")]
if dmarc:
policy = "none"
for part in dmarc[0].split(";"):
part = part.strip()
if part.startswith("p="):
policy = part[2:]
return {"status": "FOUND", "record": dmarc[0], "policy": policy}
return {"status": "MISSING", "record": None, "policy": None}


def _check_dkim(domain):
found = []
for selector in DKIM_SELECTORS:
records = _query_txt(f"{selector}._domainkey.{domain}")
dkim = [r for r in records if "DKIM1" in r or "k=" in r or "p=" in r]
if dkim:
found.append({"selector": selector, "record": dkim[0]})
return found


def _check_mta_sts(domain):
records = _query_txt(f"_mta-sts.{domain}")
sts = [r for r in records if r.startswith("v=STSv1")]
if sts:
return {"status": "FOUND", "record": sts[0]}
return {"status": "MISSING", "record": None}


def handler(q=False):
if q is False:
return False

request = json.loads(q)

domain = request.get("domain") or request.get("hostname")
if not domain:
misperrors["error"] = "A domain or hostname attribute is required."
return misperrors

if request.get("config", {}).get("custom_resolver"):
resolver.nameservers = [request["config"]["custom_resolver"]]

spf = _check_spf(domain)
dmarc = _check_dmarc(domain)
dkim = _check_dkim(domain)
mta_sts = _check_mta_sts(domain)

lines = [f"=== Email Security Posture: {domain} ===", ""]

lines.append(f"SPF: {spf['status']}")
if spf["record"]:
lines.append(f" Record: {spf['record']}")

lines.append(f"\nDMARC: {dmarc['status']}")
if dmarc["record"]:
lines.append(f" Policy: {dmarc['policy']}")
lines.append(f" Record: {dmarc['record']}")

if dkim:
lines.append(f"\nDKIM: FOUND ({len(dkim)} selector(s))")
for entry in dkim:
lines.append(f" Selector '{entry['selector']}': {entry['record'][:80]}...")
else:
lines.append("\nDKIM: NOT FOUND (tested common selectors)")

lines.append(f"\nMTA-STS: {mta_sts['status']}")
if mta_sts["record"]:
lines.append(f" Record: {mta_sts['record']}")

score = sum([
1 if spf["status"] == "FOUND" else 0,
1 if dmarc["status"] == "FOUND" else 0,
1 if dmarc.get("policy") in ("reject", "quarantine") else 0,
1 if dkim else 0,
1 if mta_sts["status"] == "FOUND" else 0,
])
lines.append(f"\nSecurity Score: {score}/5")

result_text = "\n".join(lines)
return {"results": [{"types": ["text"], "values": result_text}]}


def introspection():
return mispattributes


def version():
return moduleinfo
140 changes: 140 additions & 0 deletions misp_modules/modules/expansion/ssh_fingerprint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import hashlib
import json
import socket


misperrors = {"error": "Error"}
mispattributes = {"input": ["ip-src", "ip-dst"], "output": ["text"]}
moduleinfo = {
"version": "0.1",
"author": "Mihai Catalin Teodosiu",
"description": "Grab SSH server key fingerprint from an IP address for verification or MitM detection.",
"module-type": ["expansion", "hover"],
"name": "SSH Fingerprint",
"logo": "",
"requirements": [],
"features": (
"The module takes an IP address attribute as input, connects to port 22,"
" performs the SSH protocol version exchange and key exchange init to extract"
" the server host key algorithms and SSH banner. Useful for detecting MitM"
" attacks or verifying server identity changes."
),
"references": ["https://tools.ietf.org/html/rfc4253"],
"input": "An IP address attribute (ip-src or ip-dst).",
"output": "Text containing SSH banner and key exchange information.",
}
moduleconfig = ["port", "timeout"]


def _grab_ssh_banner(ip, port=22, timeout=5):
result = {
"banner": None,
"kex_algorithms": None,
"host_key_algorithms": None,
"error": None,
}
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
sock.connect((ip, port))

banner = sock.recv(256).decode("utf-8", errors="replace").strip()
result["banner"] = banner

sock.sendall(b"SSH-2.0-MISP_Fingerprint_Module\r\n")

kex_data = sock.recv(4096)
if len(kex_data) > 21:
payload = kex_data[5:]
if payload and payload[0:1] == b"\x14":
payload = payload[16:]
if payload and payload[0:1] == b"\x14":
pass

try:
msg_code = payload[0]
if msg_code == 20:
offset = 17
if offset < len(payload):
kex_len = int.from_bytes(
payload[offset : offset + 4], "big"
)
offset += 4
if offset + kex_len <= len(payload):
kex_str = payload[offset : offset + kex_len].decode(
"utf-8", errors="replace"
)
result["kex_algorithms"] = kex_str
offset += kex_len

hk_len = int.from_bytes(
payload[offset : offset + 4], "big"
)
offset += 4
if offset + hk_len <= len(payload):
hk_str = payload[offset : offset + hk_len].decode(
"utf-8", errors="replace"
)
result["host_key_algorithms"] = hk_str
except (IndexError, ValueError):
pass

raw_hash = hashlib.sha256(kex_data).hexdigest()
result["kex_hash"] = raw_hash

sock.close()
except socket.timeout:
result["error"] = "Connection timed out"
except ConnectionRefusedError:
result["error"] = "Connection refused (port closed)"
except Exception as e:
result["error"] = str(e)

return result


def handler(q=False):
if q is False:
return False

request = json.loads(q)

ip = request.get("ip-src") or request.get("ip-dst")
if not ip:
misperrors["error"] = "An IP address attribute is required."
return misperrors

config = request.get("config", {})
port = int(config.get("port") or 22)
timeout = float(config.get("timeout") or 5)

result = _grab_ssh_banner(ip, port, timeout)

lines = [f"=== SSH Fingerprint: {ip}:{port} ===", ""]

if result["error"]:
lines.append(f"Error: {result['error']}")
return {"results": [{"types": ["text"], "values": "\n".join(lines)}]}

if result["banner"]:
lines.append(f"Banner: {result['banner']}")

if result.get("host_key_algorithms"):
lines.append(f"\nHost Key Algorithms: {result['host_key_algorithms']}")

if result.get("kex_algorithms"):
lines.append(f"KEX Algorithms: {result['kex_algorithms']}")

if result.get("kex_hash"):
lines.append(f"\nKEX Init Hash (SHA256): {result['kex_hash']}")
lines.append(" (Compare this hash over time to detect server key changes)")

return {"results": [{"types": ["text"], "values": "\n".join(lines)}]}


def introspection():
return mispattributes


def version():
return moduleinfo
Loading