Not fully working lib, but still a good start

master
otthorn 3 years ago
parent c60d69d99b
commit 2ef3962bdf

302
lib.py

@ -0,0 +1,302 @@
#! /bin/env python3
import argparse
import collections
import configparser
import logging
import dns.name
import dns.rdataset
import dns.rdatatype
from dns.rdtypes.ANY import CNAME, DNAME, MX, NS, SOA, SSHFP, TXT
from dns.rdtypes.IN import AAAA, SRV, A
# dns name object '@'
AT = dns.name.Name(())
def format_rname(mail: str):
"""
Format a email given by re2o API to a rname dnspython object.
Given an email address in the standard string format `mail@example.tld`
return an email address in the format required by RFC 1035
`mail.example.tld.`
Be careful when using this function. It is a very simple email parsing
function and does support the wide range of possible emails format. It also
does not check is email is valid and does not escape caracters.
Return a `dns.name.Name` object.
"""
local, domain = mail.split("@")
rname = dns.name.Name((local, *dns.name.from_text(domain)))
return rname
def format_mname(name: str):
"""
Format a zone name given by the re2o API to a mname dnspython object.
Given a a name of the format `.zone.domain.tld` output the
`zone.domain.tld.`, formatted accordingly to the RFC 1035.
Return a `dns.name.name` object.
"""
if name[0] == ".":
name = name[1:]
mname = dns.name.from_text(name)
return mname
def soa_handler(zone, records):
"""Handler for SOA record"""
soa = zone["soa"]
logging.debug(f"SOA = {soa}")
rd = dns.rdataset.Rdataset(dns.rdataclass.IN, dns.rdatatype.SOA)
soa_obj = SOA.SOA(
dns.rdataclass.IN,
dns.rdatatype.SOA,
format_mname(soa["name"]),
format_rname(soa["mail"]),
#soa["serial"],
0,
soa["refresh"],
soa["retry"],
soa["expire"],
soa["ttl"],
)
rd.add(soa_obj)
records[AT].rdatasets.append(rd)
def originv4_handler(zone, records):
"""Handler for the IPv4 origin"""
ipv4_addr = zone["originv4"]["ipv4"]
logging.debug(f"originv4 = {zone['originv4']}")
rd = dns.rdataset.Rdataset(dns.rdataclass.IN, dns.rdatatype.A)
originv4_obj = A.A(
dns.rdataclass.IN,
dns.rdatatype.A,
ipv4_addr
)
rd.add(originv4_obj)
records[AT].rdatasets.append(rd)
def originv6_handler(zone, records):
"""Handler for the IPv6 origin"""
ipv6_addr = zone["originv6"] # Yes, re2o is this weird and inconsistent
logging.debug(f"originv6 = {zone['originv6']}")
rd = dns.rdataset.Rdataset(dns.rdataclass.IN, dns.rdatatype.AAAA)
originv6_obj = AAAA.AAAA(
dns.rdataclass.IN,
dns.rdatatype.AAAA,
ipv6_addr
)
rd.add(originv6_obj)
records[AT].rdatasets.append(rd)
def ns_records_handler(zone, records):
"""Handler for the NS record"""
for record in zone["ns_records"]:
logging.debug(f"NS target = {record}")
target = record["target"]
target_obj = dns.name.from_text(target)
NS_obj = NS.NS(
dns.rdataclass.IN,
dns.rdatatype.NS,
target_obj
)
rd = dns.rdataset.Rdataset(dns.rdataclass.IN, dns.rdatatype.NS)
rd.add(NS_obj)
records[AT].rdatasets.append(rd)
def sshfp_record_handler(zone, records):
"""Handler for the SSHFP record"""
for record in zone["sshfp"]:
# DNS Name object for Hostname
hostname = record["hostname"]
key_name = dns.name.Name(hostname)
for fp in record["sshfp"]:
logging.debug(f"SSHFP = {fp}")
algorithm = fp["algo_id"]
for fp_type in fp["hash"]:
fingerprint = fp["hash"][fp_type]
records[key_name].rdatasets.append(
SSHFP.SSHFP(
dns.rdataclass.IN,
dns.rdatatype.SSHFP,
algorithm,
fp_type,
fingerprint,
)
)
def mx_records_handler(zone, records):
"""Handler for the MX record"""
# dns name object '@'
key_name = dns.name.Name(())
for record in zone["mx_records"]:
logging.debug(f"MX = {record}")
preference = record["priority"]
exchange = record["target"]
records[key_name].rdatasets.append(
MX.MX(
dns.rdataclass.IN,
dns.rdatatype.MX,
preference,
exchange
)
)
def txt_records_handler(zone, records):
"""Handler for TXT record"""
for record in zone["txt_records"]:
logging.debug(f"TXT = {record}")
# DNS Name object for field1
name = record["field1"]
key_name = dns.name.Name((name,))
records[key_name].rdatasets.append(
TXT.TXT(
dns.rdataclass.IN,
dns.rdatatype.TXT,
record["field2"]
)
)
def srv_records_handler(zone, records):
"""Handler for SRV record"""
for record in zone["srv_records"]:
logging.debug(f"SRV = {record}")
# DNS Name obj for SRV
key_name = dns.name.from_text(f"{record['service']}_{record['protocol']}")
records[key_name].rdatasets.append(
SRV.SRV(
dns.rdataclass.IN,
dns.rdatatype.SRV,
record["priority"],
record["weight"],
record["port"],
record["target"]
)
)
def a_records_handler(zone, records):
"""Handler for A Record"""
for record in zone["a_records"]:
logging.debug(f"A = {record}")
# DNS Name object for Hostname
hostname = record["hostname"]
key_name = dns.name.Name((hostname,))
ipv4_addr = record["ipv4"]
rd = dns.rdataset.Rdataset(dns.rdataclass.IN, dns.rdatatype.A)
A_obj = A.A(
dns.rdataclass.IN,
dns.rdatatype.A,
ipv4_addr
)
rd.add(A_obj)
records[key_name].rdatasets.append(rd)
def aaaa_records_handler(zone, records):
"""Handler for AAAA Record"""
for record in zone["aaaa_records"]:
logging.debug(f"AAAA = {record}")
if record["ipv6"] == []:
logging.debug("AAAA record does not have an IPv6. Skipping.")
return
ipv6_addr = record["ipv6"][0]["ipv6"] # thanks re2o
AAAA_obj = AAAA.AAAA(
dns.rdataclass.IN,
dns.rdatatype.AAAA,
ipv6_addr
)
# DNS Name object for Hostname
hostname = record["hostname"]
key_name = dns.name.Name((hostname,))
rd = dns.rdataset.Rdataset(dns.rdataclass.IN, dns.rdatatype.AAAA)
rd.add(AAAA_obj)
records[key_name].rdatasets.append(rd)
def pass_handler(zone, records):
pass
HANDLERS = {
"soa": soa_handler,
"originv4": originv4_handler,
"originv6": originv6_handler,
"ns_records": ns_records_handler,
"sshfp": sshfp_record_handler,
"mx_records": mx_records_handler,
"txt_records": txt_records_handler,
"srv_records": srv_records_handler,
"a_records": a_records_handler,
"aaaa_records": aaaa_records_handler,
"name": pass_handler,
"cname_records": pass_handler,
"dname_records": pass_handler,
}
Loading…
Cancel
Save