11import logging
22
3+ import dns .message
4+ import dns .name
5+ import dns .query
6+ import dns .rdataclass
7+ import dns .rdatatype
8+ import dns .zone
9+ from django .conf import settings
10+
311from desecapi import knot , pdns
412from desecapi .exceptions import KnotException
513
614logger = logging .getLogger (__name__ )
715
16+ _DNSSEC_TYPES = {
17+ "DNSKEY" ,
18+ "CDS" ,
19+ "CDNSKEY" ,
20+ "RRSIG" ,
21+ "NSEC" ,
22+ "NSEC3" ,
23+ "NSEC3PARAM" ,
24+ }
25+
826
927def get_keys (domain ):
1028 if getattr (domain , "nslord" , None ) == "knot" :
@@ -22,3 +40,79 @@ def get_zonefile(domain) -> bytes:
2240 if getattr (domain , "nslord" , None ) == "knot" :
2341 return knot .get_zonefile (domain )
2442 return pdns .get_zonefile (domain )
43+
44+
45+ def get_zonefile_without_dnssec (domain ) -> bytes :
46+ zonefile = get_zonefile (domain ).decode ()
47+ rrsets = zonefile_to_rrsets (domain .name , zonefile )
48+ return rrsets_to_zonefile (domain .name , rrsets ).encode ()
49+
50+
51+ def zonefile_to_rrsets (domain_name : str , zonefile : str ):
52+ zone = dns .zone .from_text (
53+ zonefile ,
54+ origin = dns .name .from_text (domain_name ),
55+ allow_include = False ,
56+ check_origin = False ,
57+ relativize = False ,
58+ )
59+ rrsets = []
60+ for name , rdataset in zone .iterate_rdatasets ():
61+ rtype = dns .rdatatype .to_text (rdataset .rdtype )
62+ if rtype in _DNSSEC_TYPES :
63+ continue
64+ rrsets .append (
65+ {
66+ "name" : name .to_text (),
67+ "type" : rtype ,
68+ "ttl" : rdataset .ttl ,
69+ "records" : [rdata .to_text () for rdata in rdataset ],
70+ }
71+ )
72+ return rrsets
73+
74+
75+ def rrsets_to_zonefile (domain_name : str , rrsets ) -> str :
76+ lines = []
77+ for rrset in rrsets :
78+ name = rrset ["name" ]
79+ ttl = rrset ["ttl" ]
80+ rtype = rrset ["type" ]
81+ for record in rrset ["records" ]:
82+ lines .append (f"{ name } \t { ttl } \t IN\t { rtype } \t { record } " )
83+ return "\n " .join (lines ) + "\n "
84+
85+
86+ def get_csk_private_key (domain ):
87+ if getattr (domain , "nslord" , None ) == "knot" :
88+ return domain .get_csk_private_key ()
89+ private_key = pdns .get_csk_private_key (domain .name )
90+ return private_key or domain .get_csk_private_key ()
91+
92+
93+ def get_soa_serial (domain ):
94+ name = domain .name .rstrip ("." ) + "."
95+ if getattr (domain , "nslord" , None ) == "knot" :
96+ host = settings .NSLORD_KNOT_HOST
97+ port = settings .NSLORD_KNOT_PORT
98+ timeout = settings .NSLORD_KNOT_TIMEOUT
99+ else :
100+ host = "nslord"
101+ port = 53
102+ timeout = 5
103+ query = dns .message .make_query (name , dns .rdatatype .SOA )
104+ host = pdns .gethostbyname_cached (host )
105+ try :
106+ response = dns .query .tcp (query , host , port = port , timeout = timeout )
107+ except Exception :
108+ logger .warning ("SOA serial query failed for %s" , name , exc_info = True )
109+ return None
110+ rrset = response .get_rrset (
111+ dns .message .ANSWER ,
112+ dns .name .from_text (name ),
113+ dns .rdataclass .IN ,
114+ dns .rdatatype .SOA ,
115+ )
116+ if rrset is None :
117+ return None
118+ return rrset [0 ].serial
0 commit comments