You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

391 lines
9.5 KiB
Python

#! /bin/env python3
import argparse
import collections
import configparser
import logging
import dns.name
import dns.rdataset
import dns.rdatatype
from dns.rdtypes.ANY import CNAME, DNAME, MX, NS, SOA, SSHFP, TXT
from dns.rdtypes.IN import AAAA, SRV, A
import dns.resolver
# dns name object '@'
AT = dns.name.Name(())
def format_rname(mail: str):
"""
Format a email given by re2o API to a rname dnspython object.
Given an email address in the standard string format `mail@example.tld`
return an email address in the format required by RFC 1035
`mail.example.tld.`
Be careful when using this function. It is a very simple email parsing
function and does support the wide range of possible emails format. It also
does not check is email is valid and does not escape caracters.
Return a `dns.name.Name` object.
"""
local, domain = mail.split("@")
rname = dns.name.Name((local, *dns.name.from_text(domain)))
return rname
def format_re2o_domain(name: str):
"""
Format a zone name given by the re2o API to a mname dnspython object.
Given a a name of the format `.zone.domain.tld` output the
`zone.domain.tld.`, formatted accordingly to the RFC 1035.
Return a `dns.name.name` object.
"""
if name[0] == ".":
name = name[1:]
name_obj = dns.name.from_text(name)
return name_obj
def add_to_zone(zone, name, rdata, ttl=None):
"""Add a rdata object to a zone object."""
node = zone.find_node(name, create=True)
rdataset = node.find_rdataset(
rdata.rdclass,
rdata.rdtype,
create=True
)
rdataset.add(rdata, ttl=ttl)
def get_serial(dns_zone):
"""
Query the serial number from the NS
The parameter `dns_zone` can either be a `str` or a `dns.zone.Zone` object.
Error handling is added to return 0 if the query is unsucessful.
"""
try:
answer = dns.resolver.query(dns_zone, 'soa')
soa = answer.rrset.items[0]
serial = soa.serial
except:
logging.warning(f"[GET SERIAL] failed to query serial for this zone."
"Fallback to default value 0")
serial = 0
return serial
def update_serial(serial, serial_bits=32):
"""Update serial number
According to RFC 1982 and Knot implementation.
SERIAL_BITS = 32 by default, which means the serial number counter can
range from 0 to 2^32 - 1.
"""
serial = (serial + 1) % 2**serial_bits
return serial
def soa_handler(re2o_zone, dns_zone):
"""Handler for SOA record"""
soa = re2o_zone["soa"]
logging.debug(f"SOA = {soa}")
ns = re2o_zone["ns_records"][0]["target"]
ns_obj = dns.name.from_text(ns)
origin = dns_zone.origin
serial = get_serial(origin)
serial = update_serial(serial)
logging.debug(f"[SOA] zone_origin={origin} serial={serial}")
soa_obj = SOA.SOA(
dns.rdataclass.IN,
dns.rdatatype.SOA,
ns_obj,
format_rname(soa["mail"]),
serial,
soa["refresh"],
soa["retry"],
soa["expire"],
soa["ttl"],
)
add_to_zone(dns_zone, AT, soa_obj)
def originv4_handler(re2o_zone, dns_zone):
"""Handler for the IPv4 origin"""
ipv4_addr = re2o_zone["originv4"]["ipv4"]
logging.debug(f"originv4 = {re2o_zone['originv4']}")
originv4_obj = A.A(
dns.rdataclass.IN,
dns.rdatatype.A,
ipv4_addr
)
add_to_zone(dns_zone, AT, originv4_obj)
def originv6_handler(re2o_zone, dns_zone):
"""Handler for the IPv6 origin"""
ipv6_addr = zone["originv6"] # Yes, re2o is this weird and inconsistent
logging.debug(f"originv6 = {zone['originv6']}")
originv6_obj = AAAA.AAAA(
dns.rdataclass.IN,
dns.rdatatype.AAAA,
ipv6_addr
)
add_to_zone(dns_zone, AT, originv6_obj)
def ns_records_handler(re2o_zone, dns_zone):
"""Handler for the NS record"""
for record in re2o_zone["ns_records"]:
logging.debug(f"NS target = {record}")
target = record["target"]
target_obj = dns.name.from_text(target)
NS_obj = NS.NS(
dns.rdataclass.IN,
dns.rdatatype.NS,
target_obj
)
add_to_zone(dns_zone, AT, NS_obj, record["ttl"])
def sshfp_record_handler(re2o_zone, dns_zone):
"""Handler for the SSHFP record"""
for record in re2o_zone["sshfp"]:
# DNS Name object for Hostname
hostname = record["hostname"]
name_obj = dns.name.Name(hostname)
for fp in record["sshfp"]:
logging.debug(f"SSHFP = {fp}")
algorithm = fp["algo_id"]
for fp_type in fp["hash"]:
fingerprint = fp["hash"][fp_type]
SSHFP_obj = SSHFP.SSHFP(
dns.rdataclass.IN,
dns.rdatatype.SSHFP,
algorithm,
fp_type,
fingerprint,
)
add_to_zone(dns_zone, name_obj, SSHFP_obj, record["ttl"])
def mx_records_handler(re2o_zone, dns_zone):
"""Handler for the MX record"""
for record in re2o_zone["mx_records"]:
logging.debug(f"MX = {record}")
preference = record["priority"]
exchange = record["target"]
exchange_obj = dns.name.from_text(exchange)
MX_obj = MX.MX(
dns.rdataclass.IN,
dns.rdatatype.MX,
preference,
exchange_obj
)
add_to_zone(dns_zone, AT, MX_obj, record["ttl"])
def txt_records_handler(re2o_zone, dns_zone):
"""Handler for TXT record"""
for record in re2o_zone["txt_records"]:
logging.debug(f"TXT = {record}")
# DNS Name object for field1
name = record["field1"]
if name == "@":
name_obj = AT
else:
name_obj = format_re2o_domain(name)
txt_data = record["field2"]
if txt_data[0] == '"' and txt_data[-1] == '"':
txt_data = txt_data[1:-1]
logging.debug(f"TXT-DATA = {txt_data}")
TXT_obj = TXT.TXT(
dns.rdataclass.IN,
dns.rdatatype.TXT,
txt_data
)
add_to_zone(dns_zone, name_obj, TXT_obj, record["ttl"])
def srv_records_handler(re2o_zone, dns_zone):
"""Handler for SRV record"""
for record in re2o_zone["srv_records"]:
logging.debug(f"SRV = {record}")
# DNS Name obj for SRV
name_obj = dns.name.from_text(f"{record['service']}_{record['protocol']}")
SRV_obj = SRV.SRV(
dns.rdataclass.IN,
dns.rdatatype.SRV,
record["priority"],
record["weight"],
record["port"],
record["target"]
)
add_to_zone(dns_zone, name_obj, SRV_obj, record["ttl"])
def a_records_handler(re2o_zone, dns_zone):
"""Handler for A Record"""
for record in re2o_zone["a_records"]:
logging.debug(f"A = {record}")
# DNS Name object for Hostname
hostname = record["hostname"]
name_obj = dns.name.Name((hostname,))
ipv4_addr = record["ipv4"]
A_obj = A.A(
dns.rdataclass.IN,
dns.rdatatype.A,
ipv4_addr
)
add_to_zone(dns_zone, name_obj, A_obj, record["ttl"])
def aaaa_records_handler(re2o_zone, dns_zone):
"""Handler for AAAA Record"""
for record in re2o_zone["aaaa_records"]:
logging.debug(f"AAAA = {record}")
if record["ipv6"] == []:
logging.debug("AAAA record does not have an IPv6. Skipping.")
return
# DNS Name object for Hostname
hostname = record["hostname"]
name_obj = dns.name.Name((hostname,))
ipv6_addr = record["ipv6"][0]["ipv6"] # thanks re2o
AAAA_obj = AAAA.AAAA(
dns.rdataclass.IN,
dns.rdatatype.AAAA,
ipv6_addr
)
add_to_zone(dns_zone, name_obj, AAAA_obj, record["ttl"])
def cname_records_handler(re2o_zone, dns_zone):
"""Handler fo CNAME records"""
for record in re2o_zone["cname_records"]:
logging.debug(f"CNAME = {record}")
alias_obj = dns.name.from_text(record["alias"])
name_obj = dns.name.from_text(record["hostname"], origin=None)
CNAME_obj = CNAME.CNAME(
dns.rdataclass.IN,
dns.rdatatype.CNAME,
alias_obj
)
add_to_zone(dns_zone, name_obj, CNAME_obj, record["ttl"])
def dname_records_handler(re2o_zone, dns_zone):
"""Handler for DNAME records"""
for record in re2o_zone["dname_records"]:
logging.debug(f"DNAME = {record}")
name_obj = format_re2o_domain(record["alias"])
target_obj = format_re2o_domain(record["zone"])
DNAME_obj = DNAME.DNAME(
dns.rdataclass.IN,
dns.rdatatype.DNAME,
target_obj
)
add_to_zone(dns_zone, name_obj, DNAME_obj, record["ttl"])
def pass_handler(zone, records):
"""
Do nothing (pass)
Handler which does nothing, used for edge cases like the pseudo-record
`name` returned by Re2oAPI or to disable some other handlers in the
`HANDLERS` variable.
"""
pass
HANDLERS = {
"soa": soa_handler,
"originv4": originv4_handler,
"originv6": originv6_handler,
"ns_records": ns_records_handler,
"sshfp": sshfp_record_handler,
"mx_records": mx_records_handler,
"txt_records": txt_records_handler,
"srv_records": srv_records_handler,
"a_records": a_records_handler,
"aaaa_records": aaaa_records_handler,
"cname_records": cname_records_handler,
"dname_records": dname_records_handler,
"name": pass_handler,
}