Skip to content

Commit e128dc0

Browse files
committed
move classes to files
1 parent 991702e commit e128dc0

3 files changed

Lines changed: 146 additions & 206 deletions

File tree

data_response.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
from typing import Any
2+
3+
4+
class DataResponse: # noqa: B903
5+
status: bool # if status is True we have data else a message
6+
data: Any
7+
message: str
8+
9+
def __init__(
10+
self,
11+
*,
12+
status: bool = False,
13+
data: Any = None,
14+
message: str = "",
15+
) -> None:
16+
self.status = status
17+
self.data = data
18+
self.message = message

t2.py

Lines changed: 2 additions & 206 deletions
Original file line numberDiff line numberDiff line change
@@ -1,146 +1,5 @@
1-
import sys
2-
from typing import Any
3-
4-
import tld
5-
import whodap
6-
71
import whoisdomain
8-
9-
10-
class DataResponse: # noqa: B903
11-
status: bool # if status is True we have data else a message
12-
data: Any
13-
message: str
14-
15-
def __init__(
16-
self,
17-
*,
18-
status: bool = False,
19-
data: Any = None,
20-
message: str = "",
21-
) -> None:
22-
self.status = status
23-
self.data = data
24-
self.message = message
25-
26-
27-
class WhoisRdap:
28-
def __init__(self) -> None:
29-
self.dnsc = whodap.DNSClient.new_client() # make sure we cache the primary looup for the tld rdap servers
30-
31-
def fld(self, domain: str) -> str | None:
32-
return tld.get_fld(domain, fix_protocol=True, fail_silently=True)
33-
34-
def do_one_domain(self, domain: str) -> DataResponse:
35-
try:
36-
xfld = str(self.fld(domain))
37-
a = xfld.split(".")
38-
dom = ".".join(a[:-1])
39-
xtld = a[-1]
40-
resp = self.dnsc.lookup(dom, xtld)
41-
return DataResponse(status=True, data=resp.to_whois_dict())
42-
except Exception as e:
43-
print(f"Exception: {e}", file=sys.stderr)
44-
return DataResponse(status=False, message=str(e))
45-
46-
@classmethod
47-
def get_registrant(cls, data: dict[str, Any]) -> tuple[str, str]:
48-
registrant = ""
49-
registrant_country = ""
50-
51-
ll = [
52-
"registrant_email",
53-
"registrant_organization",
54-
"registrant_name",
55-
]
56-
for k in ll:
57-
if not registrant and data[k]:
58-
registrant = data[k]
59-
break
60-
61-
mm = [
62-
"registrant_address",
63-
"registrant_phone",
64-
]
65-
for k in mm:
66-
if not registrant_country and data[k]:
67-
registrant_country = data[k]
68-
break
69-
70-
return registrant, registrant_country
71-
72-
@classmethod
73-
def get_registrar(cls, data: dict[str, Any]) -> str:
74-
ll = [
75-
"registrar_email",
76-
"registrar_name",
77-
"registrar_phone",
78-
]
79-
for k in ll:
80-
if data[k]:
81-
return str(data[k])
82-
83-
return ""
84-
85-
@classmethod
86-
def get_emails(cls, data: dict[str, Any]) -> list[str]:
87-
r: list[str] = []
88-
email_fields: list[str] = [
89-
"abuse_email",
90-
"admin_email",
91-
"billing_email",
92-
"registrant_email",
93-
"registrar_email",
94-
"technical_email",
95-
]
96-
for k in email_fields:
97-
if data[k] and "@" in data[k]:
98-
email = str(data[k])
99-
if email not in r:
100-
r.append(email)
101-
102-
return r
103-
104-
@classmethod
105-
def map_data_to_whoisdomain(
106-
cls,
107-
data: dict[str, Any],
108-
*,
109-
with_rdap_whois: bool = False,
110-
) -> dict[str, Any]:
111-
rr: dict[str, Any] = {}
112-
rr["name"] = data["domain_name"]
113-
rr["dnssec"] = data["dnssec"] == "signed"
114-
if data["abuse_email"]:
115-
rr["abuse_contact"] = data["abuse_email"]
116-
if data["admin_email"]:
117-
rr["admin"] = data["admin_email"]
118-
if data["nameservers"]:
119-
rr["name_servers"] = data["nameservers"]
120-
121-
if data["created_date"]:
122-
rr["creation_date"] = data["created_date"]
123-
if data["updated_date"]:
124-
rr["updated_date"] = data["updated_date"]
125-
if data["expires_date"]:
126-
rr["expiration_date"] = data["expires_date"]
127-
128-
rr["statuses"] = []
129-
rr["status"] = ""
130-
if data["status"]:
131-
rr["statuses"] = data["status"]
132-
if len(rr["statuses"]):
133-
rr["status"] = rr["statuses"][0]
134-
135-
rr["emails"] = cls.get_emails(data)
136-
137-
rr["registrant"], rr["registrant_country"] = cls.get_registrant(data)
138-
rr["registrar"] = cls.get_registrar(data)
139-
140-
if with_rdap_whois:
141-
rr["_rdap_"] = data
142-
143-
return rr
2+
from whois_rdap import WhoisRdap
1443

1454

1465
def xmain() -> None:
@@ -162,7 +21,7 @@ def xmain() -> None:
16221
if rr[xfld]:
16322
print("Server:", server, xfld, rr[xfld])
16423
if dd.status:
165-
print(wr.map_data_to_whoisdomain(dd.data, with_raw=True))
24+
print(wr.map_data_to_whoisdomain(dd.data, with_rdap_whois=True))
16625
if test and test != xfld:
16726
dd = wr.do_one_domain(test)
16827
print("Test:", test, dd.data if dd.status else dd.message)
@@ -171,66 +30,3 @@ def xmain() -> None:
17130

17231

17332
xmain()
174-
175-
"""
176-
#
177-
domain_name -> name
178-
179-
abuse_contact
180-
181-
admin
182-
owner
183-
reseller
184-
185-
registrant
186-
registrant_country
187-
registrar
188-
189-
status -> statuses[0] if len(statuses) > 0 else ""
190-
statuses []
191-
name_servers []
192-
emails []
193-
194-
creation_date
195-
expiration_date
196-
updated_date
197-
198-
dnssec: bool
199-
200-
{
201-
'name': 'google.com', # actual domain used (www.google.com -> google.com)
202-
'tld': 'com',
203-
'fld': '', # new
204-
'registrar': 'MarkMonitor, Inc.',
205-
'registrant': 'Google LLC',
206-
'registrant_country': 'US',
207-
208-
'creation_date': datetime.datetime(1997, 9, 15, 9, 0),
209-
'expiration_date': datetime.datetime(2028, 9, 13, 9, 0),
210-
'last_updated': datetime.datetime(2019, 9, 9, 17, 39, 4),
211-
212-
'status': 'clientUpdateProhibited (https://www.icann.org/epp#clientUpdateProhibited)', # statuses[0] if exists.
213-
'statuses': [
214-
'clientDeleteProhibited (https://www.icann.org/epp#clientDeleteProhibited)',
215-
'clientTransferProhibited (https://www.icann.org/epp#clientTransferProhibited)',
216-
'clientUpdateProhibited (https://www.icann.org/epp#clientUpdateProhibited)',
217-
'serverDeleteProhibited (https://www.icann.org/epp#serverDeleteProhibited)',
218-
'serverTransferProhibited (https://www.icann.org/epp#serverTransferProhibited)',
219-
'serverUpdateProhibited (https://www.icann.org/epp#serverUpdateProhibited)'
220-
],
221-
222-
'dnssec': False,
223-
224-
'name_servers': [
225-
'ns1.google.com',
226-
'ns2.google.com',
227-
'ns3.google.com',
228-
'ns4.google.com'
229-
],
230-
231-
'emails': [
232-
'abusecomplaints@markmonitor.com',
233-
'whoisrequest@markmonitor.com'
234-
]
235-
}
236-
"""

whois_rdap.py

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
import sys
2+
from typing import Any
3+
4+
import tld
5+
import whodap
6+
7+
from data_response import DataResponse
8+
9+
10+
class WhoisRdap:
11+
def __init__(self) -> None:
12+
self.dnsc = whodap.DNSClient.new_client() # make sure we cache the primary looup for the tld rdap servers
13+
14+
def fld(self, domain: str) -> str | None:
15+
return tld.get_fld(domain, fix_protocol=True, fail_silently=True)
16+
17+
def do_one_domain(self, domain: str) -> DataResponse:
18+
try:
19+
xfld = str(self.fld(domain))
20+
a = xfld.split(".")
21+
dom = ".".join(a[:-1])
22+
xtld = a[-1]
23+
resp = self.dnsc.lookup(dom, xtld)
24+
return DataResponse(status=True, data=resp.to_whois_dict())
25+
except Exception as e:
26+
print(f"Exception: {e}", file=sys.stderr)
27+
return DataResponse(status=False, message=str(e))
28+
29+
@classmethod
30+
def get_registrant(cls, data: dict[str, Any]) -> tuple[str, str]:
31+
registrant = ""
32+
registrant_country = ""
33+
34+
ll = [
35+
"registrant_email",
36+
"registrant_organization",
37+
"registrant_name",
38+
]
39+
for k in ll:
40+
if not registrant and data[k]:
41+
registrant = data[k]
42+
break
43+
44+
mm = [
45+
"registrant_address",
46+
"registrant_phone",
47+
]
48+
for k in mm:
49+
if not registrant_country and data[k]:
50+
registrant_country = data[k]
51+
break
52+
53+
return registrant, registrant_country
54+
55+
@classmethod
56+
def get_registrar(cls, data: dict[str, Any]) -> str:
57+
ll = [
58+
"registrar_email",
59+
"registrar_name",
60+
"registrar_phone",
61+
]
62+
for k in ll:
63+
if data[k]:
64+
return str(data[k])
65+
66+
return ""
67+
68+
@classmethod
69+
def get_emails(cls, data: dict[str, Any]) -> list[str]:
70+
r: list[str] = []
71+
email_fields: list[str] = [
72+
"abuse_email",
73+
"admin_email",
74+
"billing_email",
75+
"registrant_email",
76+
"registrar_email",
77+
"technical_email",
78+
]
79+
for k in email_fields:
80+
if data[k] and "@" in data[k]:
81+
email = str(data[k])
82+
if email not in r:
83+
r.append(email)
84+
85+
return r
86+
87+
@classmethod
88+
def map_data_to_whoisdomain(
89+
cls,
90+
data: dict[str, Any],
91+
*,
92+
with_rdap_whois: bool = False,
93+
) -> dict[str, Any]:
94+
rr: dict[str, Any] = {}
95+
rr["name"] = data["domain_name"]
96+
rr["dnssec"] = data["dnssec"] == "signed"
97+
if data["abuse_email"]:
98+
rr["abuse_contact"] = data["abuse_email"]
99+
if data["admin_email"]:
100+
rr["admin"] = data["admin_email"]
101+
if data["nameservers"]:
102+
rr["name_servers"] = data["nameservers"]
103+
104+
if data["created_date"]:
105+
rr["creation_date"] = data["created_date"]
106+
if data["updated_date"]:
107+
rr["updated_date"] = data["updated_date"]
108+
if data["expires_date"]:
109+
rr["expiration_date"] = data["expires_date"]
110+
111+
rr["statuses"] = []
112+
rr["status"] = ""
113+
if data["status"]:
114+
rr["statuses"] = data["status"]
115+
if len(rr["statuses"]):
116+
rr["status"] = rr["statuses"][0]
117+
118+
rr["emails"] = cls.get_emails(data)
119+
120+
rr["registrant"], rr["registrant_country"] = cls.get_registrant(data)
121+
rr["registrar"] = cls.get_registrar(data)
122+
123+
if with_rdap_whois:
124+
rr["_rdap_"] = data
125+
126+
return rr

0 commit comments

Comments
 (0)