diff --git a/library/dns_zone.py b/library/dns_zone.py new file mode 100755 index 0000000..033eff2 --- /dev/null +++ b/library/dns_zone.py @@ -0,0 +1,249 @@ +#!/usr/bin/env python3 +import itertools +import dataclasses + +from typing import Any + +import dns +import dns.serial +import dns.zone +import dns.rdata +import dns.rdataclass +import dns.rdatatype +import dns.rdtypes.IN.A +import dns.rdtypes.IN.AAAA +import dns.rdtypes.ANY.MX +import dns.rdtypes.ANY.SOA +import dns.rdtypes.ANY.NS +import dns.rdtypes.ANY.TXT + +from ansible.module_utils.basic import AnsibleModule + + +class RName(dns.name.Name): + def __init__(self, address): + try: + local, domain = address.split("@") + except ValueError: + raise ValueError( + "Invalid e-mail address format: {}".format(address) + ) + super().__init__((local,) + dns.name.from_text(domain).labels) + + +@dataclasses.dataclass +class A: + address: str + name: dns.name.Name = dns.name.empty + + def rdata(self) -> dns.rdata.Rdata: + return dns.rdtypes.IN.A.A( + dns.rdataclass.IN.IN, dns.rdatatype.A, self.address + ) + + +@dataclasses.dataclass +class AAAA: + address: str + name: dns.name.Name = dns.name.empty + + def rdata(self) -> dns.rdata.Rdata: + return dns.rdtypes.IN.AAAA.AAAA( + dns.rdataclass.IN.IN, dns.rdatatype.AAAA, self.address + ) + + +@dataclasses.dataclass +class CNAME: + address: dns.name.Name + name: dns.name.Name = dns.name.empty + + def rdata(self) -> dns.rdata.Rdata: + return dns.rdtypes.ANY.CNAME.CNAME( + dns.rdataclass.IN.IN, dns.rdatatype.CNAME, self.address + ) + + +@dataclasses.dataclass +class MX: + exchange: dns.name.Name + name: dns.name.Name = dns.name.empty + priority: int = 10 + + def rdata(self) -> dns.rdata.Rdata: + return dns.rdtypes.ANY.MX.MX( + dns.rdataclass.IN.IN, + dns.rdatatype.MX, + self.priority, + self.exchange, + ) + + +@dataclasses.dataclass +class NS: + address: dns.name.Name + name: dns.name.Name = dns.name.empty + + def rdata(self) -> dns.rdata.Rdata: + return dns.rdtypes.ANY.NS.NS( + dns.rdataclass.IN.IN, dns.rdatatype.NS, self.address + ) + + +@dataclasses.dataclass +class TXT: + data: str + name: dns.name.Name = dns.name.empty + + def rdata(self) -> dns.rdata.Rdata: + return dns.rdtypes.ANY.TXT.TXT( + dns.rdataclass.IN.IN, dns.rdatatype.TXT, self.data + ) + + +@dataclasses.dataclass +class SOA: + mname: dns.name.Name + rname: RName + refresh: int + retry: int + expire: int + ttl: int + serial: int = 1 + name: dns.name.Name = dns.name.empty + + def rdata(self) -> dns.rdata.Rdata: + return dns.rdtypes.ANY.SOA.SOA( + dns.rdataclass.IN.IN, + dns.rdatatype.SOA, + self.mname, + self.rname, + self.serial, + self.refresh, + self.retry, + self.expire, + self.ttl, + ) + + +def spec_option_of_field(field): + types = { + str: "str", + dns.name.Name: "str", + RName: "str", + int: "int", + } + return { + "type": types[field.type], + "required": field.default is dataclasses.MISSING, + } + + +def spec_options_of_type(ty): + return { + field.name: spec_option_of_field(field) + for field in dataclasses.fields(ty) + } + + +def coerce_dns_name(value: Any) -> dns.name.Name: + if not isinstance(value, dns.name.Name): + return dns.name.from_text(value, origin=dns.name.empty) + return value + + +def make_record(args, ty): + # TODO: Ça n'est pas du tout élégant, mais : + # 1. je n'ai pas réussi à spécifier dans `argument_spec` un type tiers + # 2. Ansible positionne à `None` les entrées non passées à la tâche et + # ce comportement ne semble pas modifiable + types = {f.name: f.type for f in dataclasses.fields(ty)} + coercers = { + dns.name.Name: coerce_dns_name, + RName: RName, + } + + def coerce(name, value): + if types[name] not in coercers: + return value + return coercers[types[name]](value) + + clean_args = { + name: coerce(name, value) + for name, value in args.items() + if value is not None + } + + return ty(**clean_args) + + +def zones_eq(a: dns.zone.Zone, b: dns.zone.Zone) -> bool: + return a.to_text(relativize=False) == b.to_text(relativize=False) + + +def main() -> int: + + record_types = { + "ns": NS, + "txt": TXT, + "a": A, + "aaaa": AAAA, + "mx": MX, + } + + module_args = { + "path": {"type": "path", "required": True}, + "origin": {"type": "str", "required": True}, + "soa": { + "type": "dict", + "required": True, + "options": spec_options_of_type(SOA), + }, + } + + for name, ty in record_types.items(): + module_args[name] = { + "type": "list", + "default": [], + "elements": "dict", + "options": spec_options_of_type(ty), + } + + module = AnsibleModule(argument_spec=module_args) + + origin = dns.name.from_text(module.params["origin"]) + path = module.params["path"] + + zone = dns.zone.Zone(origin) + + try: + current = dns.zone.from_file(path, origin=origin) + except: + current = None + + records = [make_record(module.params["soa"], SOA)] + + records.extend( + itertools.chain.from_iterable( + (make_record(args, ty) for args in module.params[name]) + for name, ty in record_types.items() + ) + ) + + for record in records: + node = zone.get_node(record.name, create=True) + rdata = record.rdata() + dataset = node.get_rdataset(rdata.rdclass, rdata.rdtype, create=True) + dataset.add(rdata) + + changed = current is None or not zones_eq(zone, current) + if changed: + zone.to_file(path, relativize=False) + + module.exit_json(changed=changed) + + return 0 + + +if __name__ == "__main__": + exit(main())