390 lines
9.5 KiB
Python
390 lines
9.5 KiB
Python
#! /bin/env python3
|
|
import argparse
|
|
import collections
|
|
import configparser
|
|
import logging
|
|
|
|
import dns.name
|
|
import dns.rdataset
|
|
import dns.rdatatype
|
|
from dns.rdtypes.ANY import CNAME, DNAME, MX, NS, SOA, SSHFP, TXT
|
|
from dns.rdtypes.IN import AAAA, SRV, A
|
|
import dns.resolver
|
|
|
|
# dns name object '@'
|
|
AT = dns.name.Name(())
|
|
|
|
def format_rname(mail: str):
|
|
"""
|
|
Format a email given by re2o API to a rname dnspython object.
|
|
|
|
Given an email address in the standard string format `mail@example.tld`
|
|
return an email address in the format required by RFC 1035
|
|
`mail.example.tld.`
|
|
|
|
Be careful when using this function. It is a very simple email parsing
|
|
function and does support the wide range of possible emails format. It also
|
|
does not check is email is valid and does not escape caracters.
|
|
|
|
Return a `dns.name.Name` object.
|
|
"""
|
|
|
|
local, domain = mail.split("@")
|
|
rname = dns.name.Name((local, *dns.name.from_text(domain)))
|
|
|
|
return rname
|
|
|
|
|
|
def format_re2o_domain(name: str):
|
|
"""
|
|
Format a zone name given by the re2o API to a mname dnspython object.
|
|
|
|
Given a a name of the format `.zone.domain.tld` output the
|
|
`zone.domain.tld.`, formatted accordingly to the RFC 1035.
|
|
|
|
Return a `dns.name.name` object.
|
|
"""
|
|
|
|
if name[0] == ".":
|
|
name = name[1:]
|
|
|
|
name_obj = dns.name.from_text(name)
|
|
|
|
return name_obj
|
|
|
|
def add_to_zone(zone, name, rdata, ttl=None):
|
|
"""Add a rdata object to a zone object."""
|
|
|
|
node = zone.find_node(name, create=True)
|
|
rdataset = node.find_rdataset(
|
|
rdata.rdclass,
|
|
rdata.rdtype,
|
|
create=True
|
|
)
|
|
rdataset.add(rdata, ttl=ttl)
|
|
|
|
|
|
def get_serial(dns_zone):
|
|
"""
|
|
Query the serial number from the NS
|
|
|
|
The parameter `dns_zone` can either be a `str` or a `dns.zone.Zone` object.
|
|
Error handling is added to return 0 if the query is unsucessful.
|
|
"""
|
|
|
|
try:
|
|
answer = dns.resolver.query(dns_zone, 'soa')
|
|
soa = answer.rrset.items[0]
|
|
serial = soa.serial
|
|
except:
|
|
logging.warning(f"[GET SERIAL] failed to query serial for this zone."
|
|
"Fallback to default value 0")
|
|
serial = 0
|
|
|
|
return serial
|
|
|
|
def update_serial(serial, serial_bits=32):
|
|
"""Update serial number
|
|
|
|
According to RFC 1982 and Knot implementation.
|
|
SERIAL_BITS = 32 by default, which means the serial number counter can
|
|
range from 0 to 2^32 - 1.
|
|
"""
|
|
|
|
serial = (serial + 1) % 2**serial_bits
|
|
|
|
return serial
|
|
|
|
def soa_handler(re2o_zone, dns_zone):
|
|
"""Handler for SOA record"""
|
|
|
|
soa = re2o_zone["soa"]
|
|
logging.debug(f"SOA = {soa}")
|
|
|
|
ns = re2o_zone["ns_records"][0]["target"]
|
|
ns_obj = dns.name.from_text(ns)
|
|
|
|
origin = dns_zone.origin
|
|
serial = get_serial(origin)
|
|
serial = update_serial(serial)
|
|
logging.debug(f"[SOA] zone_origin={origin} serial={serial}")
|
|
|
|
soa_obj = SOA.SOA(
|
|
dns.rdataclass.IN,
|
|
dns.rdatatype.SOA,
|
|
ns_obj,
|
|
format_rname(soa["mail"]),
|
|
serial,
|
|
soa["refresh"],
|
|
soa["retry"],
|
|
soa["expire"],
|
|
soa["ttl"],
|
|
)
|
|
|
|
add_to_zone(dns_zone, AT, soa_obj)
|
|
|
|
|
|
def originv4_handler(re2o_zone, dns_zone):
|
|
"""Handler for the IPv4 origin"""
|
|
|
|
ipv4_addr = re2o_zone["originv4"]["ipv4"]
|
|
logging.debug(f"originv4 = {re2o_zone['originv4']}")
|
|
|
|
originv4_obj = A.A(
|
|
dns.rdataclass.IN,
|
|
dns.rdatatype.A,
|
|
ipv4_addr
|
|
)
|
|
|
|
add_to_zone(dns_zone, AT, originv4_obj)
|
|
|
|
|
|
def originv6_handler(re2o_zone, dns_zone):
|
|
"""Handler for the IPv6 origin"""
|
|
|
|
ipv6_addr = zone["originv6"] # Yes, re2o is this weird and inconsistent
|
|
logging.debug(f"originv6 = {zone['originv6']}")
|
|
|
|
originv6_obj = AAAA.AAAA(
|
|
dns.rdataclass.IN,
|
|
dns.rdatatype.AAAA,
|
|
ipv6_addr
|
|
)
|
|
|
|
add_to_zone(dns_zone, AT, originv6_obj)
|
|
|
|
|
|
def ns_records_handler(re2o_zone, dns_zone):
|
|
"""Handler for the NS record"""
|
|
|
|
for record in re2o_zone["ns_records"]:
|
|
|
|
logging.debug(f"NS target = {record}")
|
|
|
|
target = record["target"]
|
|
target_obj = dns.name.from_text(target)
|
|
|
|
NS_obj = NS.NS(
|
|
dns.rdataclass.IN,
|
|
dns.rdatatype.NS,
|
|
target_obj
|
|
)
|
|
|
|
add_to_zone(dns_zone, AT, NS_obj, record["ttl"])
|
|
|
|
|
|
def sshfp_record_handler(re2o_zone, dns_zone):
|
|
"""Handler for the SSHFP record"""
|
|
|
|
for record in re2o_zone["sshfp"]:
|
|
|
|
# DNS Name object for Hostname
|
|
hostname = record["hostname"]
|
|
name_obj = dns.name.Name(hostname)
|
|
|
|
for fp in record["sshfp"]:
|
|
logging.debug(f"SSHFP = {fp}")
|
|
|
|
algorithm = fp["algo_id"]
|
|
|
|
for fp_type in fp["hash"]:
|
|
|
|
fingerprint = fp["hash"][fp_type]
|
|
|
|
SSHFP_obj = SSHFP.SSHFP(
|
|
dns.rdataclass.IN,
|
|
dns.rdatatype.SSHFP,
|
|
algorithm,
|
|
fp_type,
|
|
fingerprint,
|
|
)
|
|
|
|
add_to_zone(dns_zone, name_obj, SSHFP_obj, record["ttl"])
|
|
|
|
|
|
def mx_records_handler(re2o_zone, dns_zone):
|
|
"""Handler for the MX record"""
|
|
|
|
for record in re2o_zone["mx_records"]:
|
|
|
|
logging.debug(f"MX = {record}")
|
|
|
|
preference = record["priority"]
|
|
exchange = record["target"]
|
|
|
|
exchange_obj = dns.name.from_text(exchange)
|
|
|
|
MX_obj = MX.MX(
|
|
dns.rdataclass.IN,
|
|
dns.rdatatype.MX,
|
|
preference,
|
|
exchange_obj
|
|
)
|
|
|
|
add_to_zone(dns_zone, AT, MX_obj, record["ttl"])
|
|
|
|
|
|
def txt_records_handler(re2o_zone, dns_zone):
|
|
"""Handler for TXT record"""
|
|
|
|
for record in re2o_zone["txt_records"]:
|
|
|
|
logging.debug(f"TXT = {record}")
|
|
|
|
# DNS Name object for field1
|
|
name = record["field1"]
|
|
if name == "@":
|
|
name_obj = AT
|
|
else:
|
|
name_obj = format_re2o_domain(name)
|
|
|
|
txt_data = record["field2"]
|
|
|
|
if txt_data[0] == '"' and txt_data[-1] == '"':
|
|
txt_data = txt_data[1:-1]
|
|
logging.debug(f"TXT-DATA = {txt_data}")
|
|
|
|
TXT_obj = TXT.TXT(
|
|
dns.rdataclass.IN,
|
|
dns.rdatatype.TXT,
|
|
txt_data
|
|
)
|
|
|
|
add_to_zone(dns_zone, name_obj, TXT_obj, record["ttl"])
|
|
|
|
|
|
def srv_records_handler(re2o_zone, dns_zone):
|
|
"""Handler for SRV record"""
|
|
|
|
for record in re2o_zone["srv_records"]:
|
|
|
|
logging.debug(f"SRV = {record}")
|
|
|
|
# DNS Name obj for SRV
|
|
name_obj = dns.name.from_text(f"{record['service']}_{record['protocol']}")
|
|
|
|
SRV_obj = SRV.SRV(
|
|
dns.rdataclass.IN,
|
|
dns.rdatatype.SRV,
|
|
record["priority"],
|
|
record["weight"],
|
|
record["port"],
|
|
record["target"]
|
|
)
|
|
|
|
add_to_zone(dns_zone, name_obj, SRV_obj, record["ttl"])
|
|
|
|
|
|
def a_records_handler(re2o_zone, dns_zone):
|
|
"""Handler for A Record"""
|
|
|
|
for record in re2o_zone["a_records"]:
|
|
|
|
logging.debug(f"A = {record}")
|
|
|
|
# DNS Name object for Hostname
|
|
hostname = record["hostname"]
|
|
name_obj = dns.name.Name((hostname,))
|
|
|
|
ipv4_addr = record["ipv4"]
|
|
|
|
A_obj = A.A(
|
|
dns.rdataclass.IN,
|
|
dns.rdatatype.A,
|
|
ipv4_addr
|
|
)
|
|
|
|
add_to_zone(dns_zone, name_obj, A_obj, record["ttl"])
|
|
|
|
|
|
def aaaa_records_handler(re2o_zone, dns_zone):
|
|
"""Handler for AAAA Record"""
|
|
|
|
for record in re2o_zone["aaaa_records"]:
|
|
|
|
logging.debug(f"AAAA = {record}")
|
|
|
|
if record["ipv6"] == []:
|
|
logging.debug("AAAA record does not have an IPv6. Skipping.")
|
|
return
|
|
|
|
# DNS Name object for Hostname
|
|
hostname = record["hostname"]
|
|
name_obj = dns.name.Name((hostname,))
|
|
|
|
ipv6_addr = record["ipv6"][0]["ipv6"] # thanks re2o
|
|
|
|
AAAA_obj = AAAA.AAAA(
|
|
dns.rdataclass.IN,
|
|
dns.rdatatype.AAAA,
|
|
ipv6_addr
|
|
)
|
|
|
|
|
|
add_to_zone(dns_zone, name_obj, AAAA_obj, record["ttl"])
|
|
|
|
|
|
def cname_records_handler(re2o_zone, dns_zone):
|
|
"""Handler fo CNAME records"""
|
|
|
|
for record in re2o_zone["cname_records"]:
|
|
|
|
logging.debug(f"CNAME = {record}")
|
|
|
|
alias_obj = dns.name.from_text(record["alias"])
|
|
name_obj = dns.name.from_text(record["hostname"], origin=None)
|
|
|
|
CNAME_obj = CNAME.CNAME(
|
|
dns.rdataclass.IN,
|
|
dns.rdatatype.CNAME,
|
|
alias_obj
|
|
)
|
|
|
|
add_to_zone(dns_zone, name_obj, CNAME_obj, record["ttl"])
|
|
|
|
|
|
def dname_records_handler(re2o_zone, dns_zone):
|
|
"""Handler for DNAME records"""
|
|
|
|
for record in re2o_zone["dname_records"]:
|
|
|
|
logging.debug(f"DNAME = {record}")
|
|
|
|
name_obj = format_re2o_domain(record["alias"])
|
|
target_obj = format_re2o_domain(record["zone"])
|
|
|
|
DNAME_obj = DNAME.DNAME(
|
|
dns.rdataclass.IN,
|
|
dns.rdatatype.DNAME,
|
|
target_obj
|
|
)
|
|
|
|
add_to_zone(dns_zone, name_obj, DNAME_obj, record["ttl"])
|
|
|
|
|
|
def pass_handler(zone, records):
|
|
"""
|
|
Do nothing (pass)
|
|
|
|
Handler which does nothing, used for edge cases like the pseudo-record
|
|
`name` returned by Re2oAPI or to disable some other handlers in the
|
|
`HANDLERS` variable.
|
|
"""
|
|
pass
|
|
|
|
|
|
HANDLERS = {
|
|
"soa": soa_handler,
|
|
"originv4": originv4_handler,
|
|
"originv6": originv6_handler,
|
|
"ns_records": ns_records_handler,
|
|
"sshfp": sshfp_record_handler,
|
|
"mx_records": mx_records_handler,
|
|
"txt_records": txt_records_handler,
|
|
"srv_records": srv_records_handler,
|
|
"a_records": a_records_handler,
|
|
"aaaa_records": aaaa_records_handler,
|
|
"cname_records": cname_records_handler,
|
|
"dname_records": dname_records_handler,
|
|
"name": pass_handler,
|
|
}
|