re2o-ns/lib.py

311 lines
7.2 KiB
Python
Raw Normal View History

#! /bin/env python3
import argparse
import collections
import configparser
import logging
import dns.name
import dns.rdataset
import dns.rdatatype
from dns.rdtypes.ANY import CNAME, DNAME, MX, NS, SOA, SSHFP, TXT
from dns.rdtypes.IN import AAAA, SRV, A
# dns name object '@'
AT = dns.name.Name(())
def format_rname(mail: str):
"""
Format a email given by re2o API to a rname dnspython object.
Given an email address in the standard string format `mail@example.tld`
return an email address in the format required by RFC 1035
`mail.example.tld.`
Be careful when using this function. It is a very simple email parsing
function and does support the wide range of possible emails format. It also
does not check is email is valid and does not escape caracters.
Return a `dns.name.Name` object.
"""
local, domain = mail.split("@")
rname = dns.name.Name((local, *dns.name.from_text(domain)))
return rname
def format_mname(name: str):
"""
Format a zone name given by the re2o API to a mname dnspython object.
Given a a name of the format `.zone.domain.tld` output the
`zone.domain.tld.`, formatted accordingly to the RFC 1035.
Return a `dns.name.name` object.
"""
if name[0] == ".":
name = name[1:]
mname = dns.name.from_text(name)
return mname
2021-04-09 05:02:56 +02:00
#def add_to_zone(zone, name, rdata):
# """Add a rdata object to a zone object."""
#
# zone.nodes.setdefault(name, dns.node.Node())
# rdataset = zone.nodes[name].find_rdataset(
# rdata.rdclass,
# rdata.rdtype,
# create=True
# )
# rdataset.add(rdata)
def add_to_zone(zone, name, rdata):
"""Add a rdata object to a zone object."""
rdataset = zone.find_rdataset(
name,
rdata.rdtype,
create=True
)
rdataset.add(rdata)
2021-04-09 05:02:56 +02:00
def soa_handler(re2o_zone, dns_zone):
"""Handler for SOA record"""
2021-04-09 05:02:56 +02:00
soa = re2o_zone["soa"]
logging.debug(f"SOA = {soa}")
2021-04-09 05:02:56 +02:00
ns = re2o_zone["ns_records"][0]["target"]
ns_obj = dns.name.from_text(ns)
soa_obj = SOA.SOA(
2021-04-09 05:02:56 +02:00
dns.rdataclass.IN,
dns.rdatatype.SOA,
ns_obj,
format_rname(soa["mail"]),
#soa["serial"],
0,
soa["refresh"],
soa["retry"],
soa["expire"],
soa["ttl"],
)
2021-04-09 05:02:56 +02:00
add_to_zone(dns_zone, AT, soa_obj)
2021-04-09 05:02:56 +02:00
def originv4_handler(re2o_zone, dns_zone):
"""Handler for the IPv4 origin"""
2021-04-09 05:02:56 +02:00
ipv4_addr = re2o_zone["originv4"]["ipv4"]
logging.debug(f"originv4 = {re2o_zone['originv4']}")
originv4_obj = A.A(
dns.rdataclass.IN,
dns.rdatatype.A,
ipv4_addr
)
2021-04-09 05:02:56 +02:00
add_to_zone(dns_zone, AT, originv4_obj)
2021-04-09 05:02:56 +02:00
def originv6_handler(re2o_zone, dns_zone):
"""Handler for the IPv6 origin"""
ipv6_addr = zone["originv6"] # Yes, re2o is this weird and inconsistent
logging.debug(f"originv6 = {zone['originv6']}")
originv6_obj = AAAA.AAAA(
dns.rdataclass.IN,
dns.rdatatype.AAAA,
ipv6_addr
)
2021-04-09 05:02:56 +02:00
add_to_zone(dns_zone, AT, originv6_obj)
2021-04-09 05:02:56 +02:00
def ns_records_handler(re2o_zone, dns_zone):
"""Handler for the NS record"""
2021-04-09 05:02:56 +02:00
for record in re2o_zone["ns_records"]:
logging.debug(f"NS target = {record}")
target = record["target"]
target_obj = dns.name.from_text(target)
NS_obj = NS.NS(
dns.rdataclass.IN,
dns.rdatatype.NS,
target_obj
)
2021-04-09 05:02:56 +02:00
add_to_zone(dns_zone, AT, NS_obj)
2021-04-09 05:02:56 +02:00
def sshfp_record_handler(re2o_zone, dns_zone):
"""Handler for the SSHFP record"""
2021-04-09 05:02:56 +02:00
for record in re2o_zone["sshfp"]:
# DNS Name object for Hostname
hostname = record["hostname"]
key_name = dns.name.Name(hostname)
for fp in record["sshfp"]:
logging.debug(f"SSHFP = {fp}")
algorithm = fp["algo_id"]
for fp_type in fp["hash"]:
fingerprint = fp["hash"][fp_type]
2021-04-09 05:02:56 +02:00
SSHFP_obj = SSHFP.SSHFP(
dns.rdataclass.IN,
dns.rdatatype.SSHFP,
algorithm,
fp_type,
fingerprint,
)
2021-04-09 05:02:56 +02:00
add_to_zone(dns_zone, key_name, SSHFP_obj)
2021-04-09 05:02:56 +02:00
def mx_records_handler(re2o_zone, dns_zone):
"""Handler for the MX record"""
2021-04-09 05:02:56 +02:00
for record in re2o_zone["mx_records"]:
logging.debug(f"MX = {record}")
preference = record["priority"]
exchange = record["target"]
2021-04-09 05:02:56 +02:00
exchange_obj = dns.name.from_text(exchange)
MX_obj = MX.MX(
dns.rdataclass.IN,
dns.rdatatype.MX,
preference,
exchange_obj
)
2021-04-09 05:02:56 +02:00
add_to_zone(dns_zone, AT, MX_obj)
2021-04-09 05:02:56 +02:00
def txt_records_handler(re2o_zone, dns_zone):
"""Handler for TXT record"""
2021-04-09 05:02:56 +02:00
for record in re2o_zone["txt_records"]:
logging.debug(f"TXT = {record}")
# DNS Name object for field1
name = record["field1"]
key_name = dns.name.Name((name,))
2021-04-09 05:02:56 +02:00
TXT_obj = TXT.TXT(
dns.rdataclass.IN,
dns.rdatatype.TXT,
record["field2"]
)
2021-04-09 05:02:56 +02:00
add_to_zone(dns_zone, key_name, TXT_obj)
2021-04-09 05:02:56 +02:00
def srv_records_handler(re2o_zone, dns_zone):
"""Handler for SRV record"""
2021-04-09 05:02:56 +02:00
for record in re2o_zone["srv_records"]:
logging.debug(f"SRV = {record}")
# DNS Name obj for SRV
key_name = dns.name.from_text(f"{record['service']}_{record['protocol']}")
2021-04-09 05:02:56 +02:00
SRV_obj = SRV.SRV(
dns.rdataclass.IN,
dns.rdatatype.SRV,
record["priority"],
record["weight"],
record["port"],
record["target"]
)
2021-04-09 05:02:56 +02:00
add_to_zone(dns_zone, key_name, SRV_obj)
2021-04-09 05:02:56 +02:00
def a_records_handler(re2o_zone, dns_zone):
"""Handler for A Record"""
2021-04-09 05:02:56 +02:00
for record in re2o_zone["a_records"]:
logging.debug(f"A = {record}")
# DNS Name object for Hostname
hostname = record["hostname"]
key_name = dns.name.Name((hostname,))
ipv4_addr = record["ipv4"]
A_obj = A.A(
dns.rdataclass.IN,
dns.rdatatype.A,
ipv4_addr
)
2021-04-09 05:02:56 +02:00
add_to_zone(dns_zone, key_name, A_obj)
2021-04-09 05:02:56 +02:00
def aaaa_records_handler(re2o_zone, dns_zone):
"""Handler for AAAA Record"""
2021-04-09 05:02:56 +02:00
for record in re2o_zone["aaaa_records"]:
logging.debug(f"AAAA = {record}")
if record["ipv6"] == []:
logging.debug("AAAA record does not have an IPv6. Skipping.")
return
2021-04-09 05:02:56 +02:00
# DNS Name object for Hostname
hostname = record["hostname"]
key_name = dns.name.Name((hostname,))
ipv6_addr = record["ipv6"][0]["ipv6"] # thanks re2o
AAAA_obj = AAAA.AAAA(
dns.rdataclass.IN,
dns.rdatatype.AAAA,
ipv6_addr
)
2021-04-09 05:02:56 +02:00
add_to_zone(dns_zone, key_name, AAAA_obj)
def pass_handler(zone, records):
pass
HANDLERS = {
"soa": soa_handler,
"originv4": originv4_handler,
"originv6": originv6_handler,
"ns_records": ns_records_handler,
"sshfp": sshfp_record_handler,
"mx_records": mx_records_handler,
"txt_records": txt_records_handler,
"srv_records": srv_records_handler,
"a_records": a_records_handler,
"aaaa_records": aaaa_records_handler,
"name": pass_handler,
"cname_records": pass_handler,
"dname_records": pass_handler,
}