diff --git a/misp_modules/modules/expansion/rdap.py b/misp_modules/modules/expansion/rdap.py new file mode 100644 index 00000000..f6c710ba --- /dev/null +++ b/misp_modules/modules/expansion/rdap.py @@ -0,0 +1,172 @@ +import ipaddress +import json +from urllib.parse import urlparse + +import requests +from pymisp import MISPAttribute, MISPEvent, MISPObject + +from . import check_input_attribute, standard_error_message + +mispattributes = { + "input": ["domain", "hostname", "ip-src", "ip-dst", "url"], + "format": "misp_standard", +} +moduleinfo = { + "version": 1, + "author": "Ali Bhutto", + "description": ( + "An expansion module to query the public RDAP bootstrap (rdap.org) for" + " registration data of a domain, hostname, IP address or URL. RDAP" + " (Registration Data Access Protocol, RFC 9082/9083) is the free," + " unauthenticated and structured successor to WHOIS." + ), + "module-type": ["expansion", "hover"], + "name": "RDAP Lookup", + "logo": "", + "requirements": [], + "features": ( + "The module takes a domain, hostname, IP address or URL attribute as" + " input, resolves a URL to its host, and queries the rdap.org bootstrap" + " which redirects to the authoritative RDAP server for the object. The" + " registrar, registration and expiration dates, name servers, status and" + " registrant information are parsed into a MISP whois object." + ), + "references": ["https://about.rdap.org/", "https://rdap.org/"], + "input": "A domain, hostname, IP address or URL attribute.", + "output": "A whois object holding the registration data returned by RDAP.", +} +moduleconfig = [] + +RDAP_URL = "https://rdap.org" + +# RDAP event actions (RFC 9083) mapped to whois object relations. +_EVENT_MAPPING = { + "registration": "creation-date", + "expiration": "expiration-date", + "last changed": "modification-date", +} + + +def _is_ip(value): + try: + ipaddress.ip_address(value) + except ValueError: + return False + return True + + +def _vcard_value(entity, field): + """Pull a single field (e.g. ``fn``, ``org``, ``email``) out of an RDAP + entity's jCard ``vcardArray`` (RFC 7095), or ``None``.""" + vcard = entity.get("vcardArray") + if not vcard or len(vcard) < 2: + return None + for item in vcard[1]: + # each item is [name, params, type, value] + if len(item) >= 4 and item[0] == field: + value = item[3] + if isinstance(value, list): + value = " ".join(str(part) for part in value if part) + return value + return None + + +def _entity_by_role(entities, role): + for entity in entities or []: + if role in entity.get("roles", []): + return entity + return None + + +def _add_if(misp_object, relation, value): + if value: + misp_object.add_attribute(relation, value) + + +def _parse_rdap(rdap, queried_value, is_ip): + """Build a MISP whois object from an RDAP response.""" + whois = MISPObject("whois") + whois.add_attribute("ip-address" if is_ip else "domain", queried_value) + + registrar = _entity_by_role(rdap.get("entities"), "registrar") + if registrar: + _add_if(whois, "registrar", _vcard_value(registrar, "fn")) + + registrant = _entity_by_role(rdap.get("entities"), "registrant") + if registrant: + _add_if(whois, "registrant-name", _vcard_value(registrant, "fn")) + _add_if(whois, "registrant-org", _vcard_value(registrant, "org")) + _add_if(whois, "registrant-email", _vcard_value(registrant, "email")) + + for event in rdap.get("events", []): + relation = _EVENT_MAPPING.get(event.get("eventAction")) + if relation and event.get("eventDate"): + _add_if(whois, relation, event["eventDate"]) + + for nameserver in rdap.get("nameservers", []): + _add_if(whois, "nameserver", nameserver.get("ldhName")) + + if rdap.get("status"): + whois.add_attribute("text", "status: " + ", ".join(rdap["status"])) + + return whois + + +def handler(q=False): + if q is False: + return False + request = json.loads(q) + if not request.get("attribute") or not check_input_attribute(request["attribute"]): + return {"error": f"{standard_error_message}, which should contain at least a type, a value and an uuid."} + attribute = request["attribute"] + if attribute.get("type") not in mispattributes["input"]: + return {"error": "Wrong input attribute type."} + + value = attribute["value"] + if attribute["type"] == "url": + host = urlparse(value).hostname + if not host: + return {"error": f"Could not extract a host from URL {value}."} + value = host + + is_ip = _is_ip(value) + path = f"ip/{value}" if is_ip else f"domain/{value}" + try: + response = requests.get( + f"{RDAP_URL}/{path}", + headers={"Accept": "application/rdap+json"}, + timeout=15, + ) + except requests.RequestException as e: + return {"error": f"Error while querying rdap.org: {e}"} + + if response.status_code == 404: + return {"error": f"No RDAP record found for {value}."} + if response.status_code != 200: + return {"error": f"Error while querying rdap.org - {response.status_code}: {response.reason}"} + + try: + rdap = response.json() + except ValueError: + return {"error": "RDAP server returned a non-JSON response."} + + misp_event = MISPEvent() + input_attribute = MISPAttribute() + input_attribute.from_dict(**attribute) + misp_event.add_attribute(**input_attribute) + + whois = _parse_rdap(rdap, value, is_ip) + whois.add_reference(input_attribute.uuid, "related-to") + misp_event.add_object(whois) + + event = json.loads(misp_event.to_json()) + return {"results": {key: event[key] for key in ("Attribute", "Object")}} + + +def introspection(): + return mispattributes + + +def version(): + moduleinfo["config"] = moduleconfig + return moduleinfo diff --git a/tests/test_rdap.py b/tests/test_rdap.py new file mode 100644 index 00000000..4b7fdfc9 --- /dev/null +++ b/tests/test_rdap.py @@ -0,0 +1,115 @@ +import json +from unittest.mock import patch + +from misp_modules.modules.expansion import rdap + + +class MockResponse: + def __init__(self, payload, status_code=200, reason="OK"): + self.payload = payload + self.status_code = status_code + self.reason = reason + + def json(self): + if self.payload is None: + raise ValueError("No JSON") + return self.payload + + +_DOMAIN_RDAP = { + "objectClassName": "domain", + "ldhName": "example.com", + "status": ["client transfer prohibited"], + "events": [ + {"eventAction": "registration", "eventDate": "1995-08-14T04:00:00Z"}, + {"eventAction": "expiration", "eventDate": "2026-08-13T04:00:00Z"}, + {"eventAction": "last changed", "eventDate": "2024-08-14T07:01:34Z"}, + ], + "nameservers": [{"ldhName": "a.iana-servers.net"}, {"ldhName": "b.iana-servers.net"}], + "entities": [ + { + "roles": ["registrar"], + "vcardArray": ["vcard", [["version", {}, "text", "4.0"], ["fn", {}, "text", "IANA"]]], + }, + { + "roles": ["registrant"], + "vcardArray": [ + "vcard", + [ + ["version", {}, "text", "4.0"], + ["fn", {}, "text", "John Doe"], + ["org", {}, "text", "Example Inc"], + ["email", {}, "text", "admin@example.com"], + ], + ], + }, + ], +} + + +def _run(attribute, payload, status_code=200): + query = json.dumps({"module": "rdap", "attribute": attribute, "config": {}}) + with patch("misp_modules.modules.expansion.rdap.requests.get") as mock_get: + mock_get.return_value = MockResponse(payload, status_code=status_code) + return rdap.handler(query), mock_get + + +def _whois_object(result): + return next(obj for obj in result["results"]["Object"] if obj["name"] == "whois") + + +def test_domain_builds_whois_object(): + attribute = {"type": "domain", "value": "example.com", "uuid": "5b582d80-7a7e-4b6a-9f22-77656e72bb3b"} + result, mock_get = _run(attribute, _DOMAIN_RDAP) + assert "rdap.org/domain/example.com" in mock_get.call_args.args[0] + + whois = _whois_object(result) + values = {(a["object_relation"], a["value"]) for a in whois["Attribute"]} + assert ("registrar", "IANA") in values + # pymisp normalises the datetime relations from "...Z" to an explicit offset + assert ("creation-date", "1995-08-14T04:00:00+00:00") in values + assert ("expiration-date", "2026-08-13T04:00:00+00:00") in values + assert ("modification-date", "2024-08-14T07:01:34+00:00") in values + assert ("registrant-org", "Example Inc") in values + assert ("registrant-email", "admin@example.com") in values + nameservers = {a["value"] for a in whois["Attribute"] if a["object_relation"] == "nameserver"} + assert nameservers == {"a.iana-servers.net", "b.iana-servers.net"} + + +def test_ip_uses_ip_endpoint_and_relation(): + attribute = {"type": "ip-src", "value": "1.1.1.1", "uuid": "5b582d80-7a7e-4b6a-9f22-77656e72bb3b"} + result, mock_get = _run(attribute, {"objectClassName": "ip network", "events": [], "entities": []}) + assert "rdap.org/ip/1.1.1.1" in mock_get.call_args.args[0] + whois = _whois_object(result) + assert ("ip-address", "1.1.1.1") in {(a["object_relation"], a["value"]) for a in whois["Attribute"]} + + +def test_url_resolves_to_host(): + attribute = { + "type": "url", + "value": "https://sub.example.com/path?q=1", + "uuid": "5b582d80-7a7e-4b6a-9f22-77656e72bb3b", + } + _, mock_get = _run(attribute, {"objectClassName": "domain", "events": [], "entities": []}) + assert "rdap.org/domain/sub.example.com" in mock_get.call_args.args[0] + + +def test_not_found_returns_error(): + attribute = {"type": "domain", "value": "nope.invalid", "uuid": "5b582d80-7a7e-4b6a-9f22-77656e72bb3b"} + result, _ = _run(attribute, None, status_code=404) + assert "error" in result + + +def test_wrong_attribute_type_returns_error(): + attribute = { + "type": "md5", + "value": "d41d8cd98f00b204e9800998ecf8427e", + "uuid": "5b582d80-7a7e-4b6a-9f22-77656e72bb3b", + } + result, _ = _run(attribute, _DOMAIN_RDAP) + assert "error" in result + + +def test_introspection_and_version(): + assert rdap.introspection() == rdap.mispattributes + assert rdap.version()["name"] == "RDAP Lookup"