#! /bin/env python3 import argparse import collections import configparser import logging import dns.name import dns.rdataset import dns.rdatatype from dns.rdtypes.ANY import CNAME, DNAME, MX, NS, SOA, SSHFP, TXT from dns.rdtypes.IN import AAAA, SRV, A import dns.resolver # dns name object '@' AT = dns.name.Name(()) def format_rname(mail: str): """ Format a email given by re2o API to a rname dnspython object. Given an email address in the standard string format `mail@example.tld` return an email address in the format required by RFC 1035 `mail.example.tld.` Be careful when using this function. It is a very simple email parsing function and does support the wide range of possible emails format. It also does not check is email is valid and does not escape caracters. Return a `dns.name.Name` object. """ local, domain = mail.split("@") rname = dns.name.Name((local, *dns.name.from_text(domain))) return rname def format_re2o_domain(name: str): """ Format a zone name given by the re2o API to a mname dnspython object. Given a a name of the format `.zone.domain.tld` output the `zone.domain.tld.`, formatted accordingly to the RFC 1035. Return a `dns.name.name` object. """ if name[0] == ".": name = name[1:] name_obj = dns.name.from_text(name) return name_obj def add_to_zone(zone, name, rdata): """Add a rdata object to a zone object.""" node = zone.find_node(name, create=True) rdataset = node.find_rdataset( rdata.rdclass, rdata.rdtype, create=True ) rdataset.add(rdata) def get_serial(dns_zone): """ Query the serial number from the NS The parameter `dns_zone` can either be a `str` or a `dns.zone.Zone` object. Error handling is added to return 0 if the query is unsucessful. """ try: answer = dns.resolver.query(dns_zone, 'soa') soa = answer.rrset.items[0] serial = soa.serial except: logging.warning(f"[GET SERIAL] failed to query serial for this zone." "Fallback to default value 0") serial = 0 return serial def update_serial(serial, serial_bits=32): """Update serial number According to RFC 1982 and Knot implementation. SERIAL_BITS = 32 by default, which means the serial number counter can range from 0 to 2^32 - 1. """ serial = (serial + 1) % 2**serial_bits return serial def soa_handler(re2o_zone, dns_zone): """Handler for SOA record""" soa = re2o_zone["soa"] logging.debug(f"SOA = {soa}") ns = re2o_zone["ns_records"][0]["target"] ns_obj = dns.name.from_text(ns) origin = dns_zone.origin serial = get_serial(origin) serial = update_serial(serial) logging.debug(f"[SOA] zone_origin={origin} serial={serial}") soa_obj = SOA.SOA( dns.rdataclass.IN, dns.rdatatype.SOA, ns_obj, format_rname(soa["mail"]), serial, soa["refresh"], soa["retry"], soa["expire"], soa["ttl"], ) add_to_zone(dns_zone, AT, soa_obj) def originv4_handler(re2o_zone, dns_zone): """Handler for the IPv4 origin""" ipv4_addr = re2o_zone["originv4"]["ipv4"] logging.debug(f"originv4 = {re2o_zone['originv4']}") originv4_obj = A.A( dns.rdataclass.IN, dns.rdatatype.A, ipv4_addr ) add_to_zone(dns_zone, AT, originv4_obj) def originv6_handler(re2o_zone, dns_zone): """Handler for the IPv6 origin""" ipv6_addr = zone["originv6"] # Yes, re2o is this weird and inconsistent logging.debug(f"originv6 = {zone['originv6']}") originv6_obj = AAAA.AAAA( dns.rdataclass.IN, dns.rdatatype.AAAA, ipv6_addr ) add_to_zone(dns_zone, AT, originv6_obj) def ns_records_handler(re2o_zone, dns_zone): """Handler for the NS record""" for record in re2o_zone["ns_records"]: logging.debug(f"NS target = {record}") target = record["target"] target_obj = dns.name.from_text(target) NS_obj = NS.NS( dns.rdataclass.IN, dns.rdatatype.NS, target_obj ) add_to_zone(dns_zone, AT, NS_obj) def sshfp_record_handler(re2o_zone, dns_zone): """Handler for the SSHFP record""" for record in re2o_zone["sshfp"]: # DNS Name object for Hostname hostname = record["hostname"] name_obj = dns.name.Name(hostname) for fp in record["sshfp"]: logging.debug(f"SSHFP = {fp}") algorithm = fp["algo_id"] for fp_type in fp["hash"]: fingerprint = fp["hash"][fp_type] SSHFP_obj = SSHFP.SSHFP( dns.rdataclass.IN, dns.rdatatype.SSHFP, algorithm, fp_type, fingerprint, ) add_to_zone(dns_zone, name_obj, SSHFP_obj) def mx_records_handler(re2o_zone, dns_zone): """Handler for the MX record""" for record in re2o_zone["mx_records"]: logging.debug(f"MX = {record}") preference = record["priority"] exchange = record["target"] exchange_obj = dns.name.from_text(exchange) MX_obj = MX.MX( dns.rdataclass.IN, dns.rdatatype.MX, preference, exchange_obj ) add_to_zone(dns_zone, AT, MX_obj) def txt_records_handler(re2o_zone, dns_zone): """Handler for TXT record""" for record in re2o_zone["txt_records"]: logging.debug(f"TXT = {record}") # DNS Name object for field1 name = record["field1"] if name == "@": name_obj = AT else: name_obj = format_re2o_domain(name) txt_data = record["field2"] if txt_data[0] == '"' and txt_data[-1] == '"': txt_data = txt_data[1:-1] logging.debug(f"TXT-DATA = {txt_data}") TXT_obj = TXT.TXT( dns.rdataclass.IN, dns.rdatatype.TXT, txt_data ) add_to_zone(dns_zone, name_obj, TXT_obj) def srv_records_handler(re2o_zone, dns_zone): """Handler for SRV record""" for record in re2o_zone["srv_records"]: logging.debug(f"SRV = {record}") # DNS Name obj for SRV name_obj = dns.name.from_text(f"{record['service']}_{record['protocol']}") SRV_obj = SRV.SRV( dns.rdataclass.IN, dns.rdatatype.SRV, record["priority"], record["weight"], record["port"], record["target"] ) add_to_zone(dns_zone, name_obj, SRV_obj) def a_records_handler(re2o_zone, dns_zone): """Handler for A Record""" for record in re2o_zone["a_records"]: logging.debug(f"A = {record}") # DNS Name object for Hostname hostname = record["hostname"] name_obj = dns.name.Name((hostname,)) ipv4_addr = record["ipv4"] A_obj = A.A( dns.rdataclass.IN, dns.rdatatype.A, ipv4_addr ) add_to_zone(dns_zone, name_obj, A_obj) def aaaa_records_handler(re2o_zone, dns_zone): """Handler for AAAA Record""" for record in re2o_zone["aaaa_records"]: logging.debug(f"AAAA = {record}") if record["ipv6"] == []: logging.debug("AAAA record does not have an IPv6. Skipping.") return # DNS Name object for Hostname hostname = record["hostname"] name_obj = dns.name.Name((hostname,)) ipv6_addr = record["ipv6"][0]["ipv6"] # thanks re2o AAAA_obj = AAAA.AAAA( dns.rdataclass.IN, dns.rdatatype.AAAA, ipv6_addr ) add_to_zone(dns_zone, name_obj, AAAA_obj) def cname_records_handler(re2o_zone, dns_zone): """Handler fo CNAME records""" for record in re2o_zone["cname_records"]: logging.debug(f"CNAME = {record}") alias_obj = dns.name.from_text(record["alias"]) name_obj = dns.name.from_text(record["hostname"], origin=None) CNAME_obj = CNAME.CNAME( dns.rdataclass.IN, dns.rdatatype.CNAME, alias_obj ) add_to_zone(dns_zone, name_obj, CNAME_obj) def dname_records_handler(re2o_zone, dns_zone): """Handler for DNAME records""" for record in re2o_zone["dname_records"]: logging.debug(f"DNAME = {record}") name_obj = format_re2o_domain(record["alias"]) target_obj = format_re2o_domain(record["zone"]) DNAME_obj = DNAME.DNAME( dns.rdataclass.IN, dns.rdatatype.DNAME, target_obj ) add_to_zone(dns_zone, name_obj, DNAME_obj) def pass_handler(zone, records): """ Do nothing (pass) Handler which does nothing, used for edge cases like the pseudo-record `name` returned by Re2oAPI or to disable some other handlers in the `HANDLERS` variable. """ pass HANDLERS = { "soa": soa_handler, "originv4": originv4_handler, "originv6": originv6_handler, "ns_records": ns_records_handler, "sshfp": sshfp_record_handler, "mx_records": mx_records_handler, "txt_records": txt_records_handler, "srv_records": srv_records_handler, "a_records": a_records_handler, "aaaa_records": aaaa_records_handler, "cname_records": cname_records_handler, "dname_records": dname_records_handler, "name": pass_handler, }