#! /bin/env python3 import argparse import collections import configparser import dns.name import dns.rdataset import dns.rdatatype from dns.rdtypes.ANY import CNAME, DNAME, MX, NS, SOA, SSHFP, TXT from dns.rdtypes.IN import AAAA, SRV, A from re2oapi import Re2oAPIClient def format_rname(mail: str): """ Format a email given by re2o API to a rname dnspython object. Given an email address in the standard string format `mail@example.tld` return an email address in the format required by RFC 1035 `mail.example.tld.` Return a `dns.name.Name` object. """ local, domain = mail.split("@") rname = dns.name.Name((local, *dns.name.from_text(domain))) return rname def format_mname(name: str): """ Format a zone name given by the re2o API to a mname dnspython object. Given a a name of the format `.zone.domain.tld` output the `zone.domain.tld.`, formatted accordingly to the RFC 1035. Return a `dns.name.name` object. """ if name[0] == ".": name = name[1:] mname = dns.name.from_text(name) return mname def soa_handler(zone, records): """Handler for SOA record""" soa = zone["soa"] records["@"].append( SOA.SOA( dns.rdataclass.IN, dns.rdatatype.SOA, format_mname(soa["name"]), format_rname(soa["mail"]), soa["serial"], soa["refresh"], soa["retry"], soa["expire"], soa["ttl"], ) ) def originv4_handler(zone, records): """Handler for the IPv4 origin""" ipv4_addr = zone["originv4"]["ipv4"] records["@"].append( A.A( dns.rdataclass.IN, dns.rdatatype.A, ipv4_addr ) ) def originv6_handler(zone, records): """Handler for the IPv6 origin""" ipv6_addr = zone["originv6"] # Yes, re2o is this weird and inconsistent records["@"].append( AAAA.AAAA( dns.rdataclass.IN, dns.rdatatype.AAAA, ipv6_addr ) ) def ns_records_handler(zone, records): """Handler for the NS record""" for record in zone["ns_records"]: target = record["target"] records["@"].append( NS.NS( dns.rdataclass.IN, dns.rdatatype.NS, target ) ) def sshfp_record_handler(zone, records): """Handler for the SSHFP record""" for record in zone["sshfp"]: for fp in record["sshfp"]: algorithm = fp["algo_id"] for fp_type in fp["hash"]: fingerprint = fp["hash"][fp_type] records[record["hostname"]].append( SSHFP.SSHFP( dns.rdataclass.IN, dns.rdatatype.SSHFP, algorithm, fp_type, fingerprint, ) ) def mx_records_handler(zone, records): """Handler for the MX record""" for record in zone["mx_records"]: preference = record["priority"] exchange = record["target"] records["@"].append( MX.MX( dns.rdataclass.IN, dns.rdatatype.MX, preference, exchange ) ) def txt_records_handler(zone, records): """Handler for TXT record""" for record in zone["txt_records"]: name = record["field1"] records[name].append( TXT.TXT( dns.rdataclass.IN, dns.rdatatype.TXT, record["field2"] ) ) def srv_records_handler(zone, records): """Handler for SRV record""" for record in zone["srv_records"]: name = dns.name.from_text(f"{record['service']}_{record['protocol']}") name_key = name.to_text() records[name_key].append( SRV.SRV( dns.rdataclass.IN, dns.rdatatype.SRV, record["priority"], record["weight"], record["port"], record["target"] ) ) def a_records_handler(zone, records): """Handler for A Record""" for record in zone["a_records"]: ipv4_addr = record["ipv4"] records[record["hostname"]].append( A.A( dns.rdataclass.IN, dns.rdatatype.A, ipv4_addr ) ) def aaaa_records_handler(zone, records): """Handler for AAAA Record""" for record in zone["aaaa_records"]: ipv6_addr = record["ipv6"][0]["ipv6"] # thanks re2o records[record["hostname"]].append( AAAA.AAAA( dns.rdataclass.IN, dns.rdatatype.AAAA, ipv6_addr ) ) def pass_handler(zone, records): pass HANDLERS = { "soa": soa_handler, "originv4": originv4_handler, "originv6": originv6_handler, "ns_records": ns_records_handler, "sshfp": sshfp_record_handler, "mx_records": mx_records_handler, "txt_records": txt_records_handler, "srv_records": srv_records_handler, "a_records": a_records_handler, "aaaa_records": aaaa_records_handler, "name": pass_handler, } parser = argparse.ArgumentParser() parser.add_argument( "-c", "--config", help="Path to the config file", type=str, default="config.ini", ) args = parser.parse_args() config = configparser.ConfigParser() config.read(args.config) api_client = Re2oAPIClient( config["Re2o"]["hostname"], config["Re2o"]["username"], config["Re2o"]["password"], use_tls=False, ) zones = api_client.list("dns/zones") records = collections.defaultdict(list) for zone in zones: for record in zone: if zone[record]: # only apply handler if record in not `None` or `[]` HANDLERS[record](zone, records)