Skip to content

Commit 4a7a4c1

Browse files
committed
Add RDAP expansion module
Adds an expansion module that queries the public RDAP bootstrap (rdap.org) for the registration data of a domain, hostname, IP address or URL. RDAP (RFC 9082/9083) is the free, unauthenticated, structured successor to WHOIS; misp-modules has whois and passive-DNS modules but no RDAP one. The registrar, registration/expiration/modification dates, name servers, status and registrant details are parsed into a MISP whois object. No configuration or API key is required.
1 parent 0945755 commit 4a7a4c1

2 files changed

Lines changed: 287 additions & 0 deletions

File tree

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
import ipaddress
2+
import json
3+
from urllib.parse import urlparse
4+
5+
import requests
6+
from pymisp import MISPAttribute, MISPEvent, MISPObject
7+
8+
from . import check_input_attribute, standard_error_message
9+
10+
mispattributes = {
11+
"input": ["domain", "hostname", "ip-src", "ip-dst", "url"],
12+
"format": "misp_standard",
13+
}
14+
moduleinfo = {
15+
"version": 1,
16+
"author": "Ali Bhutto",
17+
"description": (
18+
"An expansion module to query the public RDAP bootstrap (rdap.org) for"
19+
" registration data of a domain, hostname, IP address or URL. RDAP"
20+
" (Registration Data Access Protocol, RFC 9082/9083) is the free,"
21+
" unauthenticated and structured successor to WHOIS."
22+
),
23+
"module-type": ["expansion", "hover"],
24+
"name": "RDAP Lookup",
25+
"logo": "",
26+
"requirements": [],
27+
"features": (
28+
"The module takes a domain, hostname, IP address or URL attribute as"
29+
" input, resolves a URL to its host, and queries the rdap.org bootstrap"
30+
" which redirects to the authoritative RDAP server for the object. The"
31+
" registrar, registration and expiration dates, name servers, status and"
32+
" registrant information are parsed into a MISP whois object."
33+
),
34+
"references": ["https://about.rdap.org/", "https://rdap.org/"],
35+
"input": "A domain, hostname, IP address or URL attribute.",
36+
"output": "A whois object holding the registration data returned by RDAP.",
37+
}
38+
moduleconfig = []
39+
40+
RDAP_URL = "https://rdap.org"
41+
42+
# RDAP event actions (RFC 9083) mapped to whois object relations.
43+
_EVENT_MAPPING = {
44+
"registration": "creation-date",
45+
"expiration": "expiration-date",
46+
"last changed": "modification-date",
47+
}
48+
49+
50+
def _is_ip(value):
51+
try:
52+
ipaddress.ip_address(value)
53+
except ValueError:
54+
return False
55+
return True
56+
57+
58+
def _vcard_value(entity, field):
59+
"""Pull a single field (e.g. ``fn``, ``org``, ``email``) out of an RDAP
60+
entity's jCard ``vcardArray`` (RFC 7095), or ``None``."""
61+
vcard = entity.get("vcardArray")
62+
if not vcard or len(vcard) < 2:
63+
return None
64+
for item in vcard[1]:
65+
# each item is [name, params, type, value]
66+
if len(item) >= 4 and item[0] == field:
67+
value = item[3]
68+
if isinstance(value, list):
69+
value = " ".join(str(part) for part in value if part)
70+
return value
71+
return None
72+
73+
74+
def _entity_by_role(entities, role):
75+
for entity in entities or []:
76+
if role in entity.get("roles", []):
77+
return entity
78+
return None
79+
80+
81+
def _add_if(misp_object, relation, value):
82+
if value:
83+
misp_object.add_attribute(relation, value)
84+
85+
86+
def _parse_rdap(rdap, queried_value, is_ip):
87+
"""Build a MISP whois object from an RDAP response."""
88+
whois = MISPObject("whois")
89+
whois.add_attribute("ip-address" if is_ip else "domain", queried_value)
90+
91+
registrar = _entity_by_role(rdap.get("entities"), "registrar")
92+
if registrar:
93+
_add_if(whois, "registrar", _vcard_value(registrar, "fn"))
94+
95+
registrant = _entity_by_role(rdap.get("entities"), "registrant")
96+
if registrant:
97+
_add_if(whois, "registrant-name", _vcard_value(registrant, "fn"))
98+
_add_if(whois, "registrant-org", _vcard_value(registrant, "org"))
99+
_add_if(whois, "registrant-email", _vcard_value(registrant, "email"))
100+
101+
for event in rdap.get("events", []):
102+
relation = _EVENT_MAPPING.get(event.get("eventAction"))
103+
if relation and event.get("eventDate"):
104+
_add_if(whois, relation, event["eventDate"])
105+
106+
for nameserver in rdap.get("nameservers", []):
107+
_add_if(whois, "nameserver", nameserver.get("ldhName"))
108+
109+
if rdap.get("status"):
110+
whois.add_attribute("text", "status: " + ", ".join(rdap["status"]))
111+
112+
return whois
113+
114+
115+
def handler(q=False):
116+
if q is False:
117+
return False
118+
request = json.loads(q)
119+
if not request.get("attribute") or not check_input_attribute(request["attribute"]):
120+
return {"error": f"{standard_error_message}, which should contain at least a type, a value and an uuid."}
121+
attribute = request["attribute"]
122+
if attribute.get("type") not in mispattributes["input"]:
123+
return {"error": "Wrong input attribute type."}
124+
125+
value = attribute["value"]
126+
if attribute["type"] == "url":
127+
host = urlparse(value).hostname
128+
if not host:
129+
return {"error": f"Could not extract a host from URL {value}."}
130+
value = host
131+
132+
is_ip = _is_ip(value)
133+
path = f"ip/{value}" if is_ip else f"domain/{value}"
134+
try:
135+
response = requests.get(
136+
f"{RDAP_URL}/{path}",
137+
headers={"Accept": "application/rdap+json"},
138+
timeout=15,
139+
)
140+
except requests.RequestException as e:
141+
return {"error": f"Error while querying rdap.org: {e}"}
142+
143+
if response.status_code == 404:
144+
return {"error": f"No RDAP record found for {value}."}
145+
if response.status_code != 200:
146+
return {"error": f"Error while querying rdap.org - {response.status_code}: {response.reason}"}
147+
148+
try:
149+
rdap = response.json()
150+
except ValueError:
151+
return {"error": "RDAP server returned a non-JSON response."}
152+
153+
misp_event = MISPEvent()
154+
input_attribute = MISPAttribute()
155+
input_attribute.from_dict(**attribute)
156+
misp_event.add_attribute(**input_attribute)
157+
158+
whois = _parse_rdap(rdap, value, is_ip)
159+
whois.add_reference(input_attribute.uuid, "related-to")
160+
misp_event.add_object(whois)
161+
162+
event = json.loads(misp_event.to_json())
163+
return {"results": {key: event[key] for key in ("Attribute", "Object")}}
164+
165+
166+
def introspection():
167+
return mispattributes
168+
169+
170+
def version():
171+
moduleinfo["config"] = moduleconfig
172+
return moduleinfo

tests/test_rdap.py

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
import json
2+
from unittest.mock import patch
3+
4+
from misp_modules.modules.expansion import rdap
5+
6+
7+
class MockResponse:
8+
def __init__(self, payload, status_code=200, reason="OK"):
9+
self.payload = payload
10+
self.status_code = status_code
11+
self.reason = reason
12+
13+
def json(self):
14+
if self.payload is None:
15+
raise ValueError("No JSON")
16+
return self.payload
17+
18+
19+
_DOMAIN_RDAP = {
20+
"objectClassName": "domain",
21+
"ldhName": "example.com",
22+
"status": ["client transfer prohibited"],
23+
"events": [
24+
{"eventAction": "registration", "eventDate": "1995-08-14T04:00:00Z"},
25+
{"eventAction": "expiration", "eventDate": "2026-08-13T04:00:00Z"},
26+
{"eventAction": "last changed", "eventDate": "2024-08-14T07:01:34Z"},
27+
],
28+
"nameservers": [{"ldhName": "a.iana-servers.net"}, {"ldhName": "b.iana-servers.net"}],
29+
"entities": [
30+
{
31+
"roles": ["registrar"],
32+
"vcardArray": ["vcard", [["version", {}, "text", "4.0"], ["fn", {}, "text", "IANA"]]],
33+
},
34+
{
35+
"roles": ["registrant"],
36+
"vcardArray": [
37+
"vcard",
38+
[
39+
["version", {}, "text", "4.0"],
40+
["fn", {}, "text", "John Doe"],
41+
["org", {}, "text", "Example Inc"],
42+
["email", {}, "text", "admin@example.com"],
43+
],
44+
],
45+
},
46+
],
47+
}
48+
49+
50+
def _run(attribute, payload, status_code=200):
51+
query = json.dumps({"module": "rdap", "attribute": attribute, "config": {}})
52+
with patch("misp_modules.modules.expansion.rdap.requests.get") as mock_get:
53+
mock_get.return_value = MockResponse(payload, status_code=status_code)
54+
return rdap.handler(query), mock_get
55+
56+
57+
def _whois_object(result):
58+
return next(obj for obj in result["results"]["Object"] if obj["name"] == "whois")
59+
60+
61+
def test_domain_builds_whois_object():
62+
attribute = {"type": "domain", "value": "example.com", "uuid": "5b582d80-7a7e-4b6a-9f22-77656e72bb3b"}
63+
result, mock_get = _run(attribute, _DOMAIN_RDAP)
64+
assert "rdap.org/domain/example.com" in mock_get.call_args.args[0]
65+
66+
whois = _whois_object(result)
67+
values = {(a["object_relation"], a["value"]) for a in whois["Attribute"]}
68+
assert ("registrar", "IANA") in values
69+
# pymisp normalises the datetime relations from "...Z" to an explicit offset
70+
assert ("creation-date", "1995-08-14T04:00:00+00:00") in values
71+
assert ("expiration-date", "2026-08-13T04:00:00+00:00") in values
72+
assert ("modification-date", "2024-08-14T07:01:34+00:00") in values
73+
assert ("registrant-org", "Example Inc") in values
74+
assert ("registrant-email", "admin@example.com") in values
75+
nameservers = {a["value"] for a in whois["Attribute"] if a["object_relation"] == "nameserver"}
76+
assert nameservers == {"a.iana-servers.net", "b.iana-servers.net"}
77+
78+
79+
def test_ip_uses_ip_endpoint_and_relation():
80+
attribute = {"type": "ip-src", "value": "1.1.1.1", "uuid": "5b582d80-7a7e-4b6a-9f22-77656e72bb3b"}
81+
result, mock_get = _run(attribute, {"objectClassName": "ip network", "events": [], "entities": []})
82+
assert "rdap.org/ip/1.1.1.1" in mock_get.call_args.args[0]
83+
whois = _whois_object(result)
84+
assert ("ip-address", "1.1.1.1") in {(a["object_relation"], a["value"]) for a in whois["Attribute"]}
85+
86+
87+
def test_url_resolves_to_host():
88+
attribute = {
89+
"type": "url",
90+
"value": "https://sub.example.com/path?q=1",
91+
"uuid": "5b582d80-7a7e-4b6a-9f22-77656e72bb3b",
92+
}
93+
_, mock_get = _run(attribute, {"objectClassName": "domain", "events": [], "entities": []})
94+
assert "rdap.org/domain/sub.example.com" in mock_get.call_args.args[0]
95+
96+
97+
def test_not_found_returns_error():
98+
attribute = {"type": "domain", "value": "nope.invalid", "uuid": "5b582d80-7a7e-4b6a-9f22-77656e72bb3b"}
99+
result, _ = _run(attribute, None, status_code=404)
100+
assert "error" in result
101+
102+
103+
def test_wrong_attribute_type_returns_error():
104+
attribute = {
105+
"type": "md5",
106+
"value": "d41d8cd98f00b204e9800998ecf8427e",
107+
"uuid": "5b582d80-7a7e-4b6a-9f22-77656e72bb3b",
108+
}
109+
result, _ = _run(attribute, _DOMAIN_RDAP)
110+
assert "error" in result
111+
112+
113+
def test_introspection_and_version():
114+
assert rdap.introspection() == rdap.mispattributes
115+
assert rdap.version()["name"] == "RDAP Lookup"

0 commit comments

Comments
 (0)