Manage DNS servers using Ansible #93

Open
jeltz wants to merge 45 commits from dns into master
Showing only changes of commit c97dca8fa8 - Show all commits

249
library/dns_zone.py Executable file
View file

@ -0,0 +1,249 @@
#!/usr/bin/env python3
import itertools
import dataclasses
from typing import Any
import dns
import dns.serial
import dns.zone
import dns.rdata
import dns.rdataclass
import dns.rdatatype
import dns.rdtypes.IN.A
import dns.rdtypes.IN.AAAA
import dns.rdtypes.ANY.MX
import dns.rdtypes.ANY.SOA
import dns.rdtypes.ANY.NS
import dns.rdtypes.ANY.TXT
from ansible.module_utils.basic import AnsibleModule
class RName(dns.name.Name):
def __init__(self, address):
try:
local, domain = address.split("@")
except ValueError:
raise ValueError(
"Invalid e-mail address format: {}".format(address)
)
super().__init__((local,) + dns.name.from_text(domain).labels)
@dataclasses.dataclass
class A:
address: str
name: dns.name.Name = dns.name.empty
def rdata(self) -> dns.rdata.Rdata:
return dns.rdtypes.IN.A.A(
dns.rdataclass.IN.IN, dns.rdatatype.A, self.address
)
@dataclasses.dataclass
class AAAA:
address: str
name: dns.name.Name = dns.name.empty
def rdata(self) -> dns.rdata.Rdata:
return dns.rdtypes.IN.AAAA.AAAA(
dns.rdataclass.IN.IN, dns.rdatatype.AAAA, self.address
)
@dataclasses.dataclass
class CNAME:
address: dns.name.Name
name: dns.name.Name = dns.name.empty
def rdata(self) -> dns.rdata.Rdata:
return dns.rdtypes.ANY.CNAME.CNAME(
dns.rdataclass.IN.IN, dns.rdatatype.CNAME, self.address
)
@dataclasses.dataclass
class MX:
exchange: dns.name.Name
name: dns.name.Name = dns.name.empty
priority: int = 10
def rdata(self) -> dns.rdata.Rdata:
return dns.rdtypes.ANY.MX.MX(
dns.rdataclass.IN.IN,
dns.rdatatype.MX,
self.priority,
self.exchange,
)
@dataclasses.dataclass
class NS:
address: dns.name.Name
name: dns.name.Name = dns.name.empty
def rdata(self) -> dns.rdata.Rdata:
return dns.rdtypes.ANY.NS.NS(
dns.rdataclass.IN.IN, dns.rdatatype.NS, self.address
)
@dataclasses.dataclass
class TXT:
data: str
name: dns.name.Name = dns.name.empty
def rdata(self) -> dns.rdata.Rdata:
return dns.rdtypes.ANY.TXT.TXT(
dns.rdataclass.IN.IN, dns.rdatatype.TXT, self.data
)
@dataclasses.dataclass
class SOA:
mname: dns.name.Name
rname: RName
refresh: int
retry: int
expire: int
ttl: int
serial: int = 1
name: dns.name.Name = dns.name.empty
def rdata(self) -> dns.rdata.Rdata:
return dns.rdtypes.ANY.SOA.SOA(
dns.rdataclass.IN.IN,
dns.rdatatype.SOA,
self.mname,
self.rname,
self.serial,
self.refresh,
self.retry,
self.expire,
self.ttl,
)
def spec_option_of_field(field):
types = {
str: "str",
dns.name.Name: "str",
RName: "str",
int: "int",
}
return {
"type": types[field.type],
"required": field.default is dataclasses.MISSING,
}
def spec_options_of_type(ty):
return {
field.name: spec_option_of_field(field)
for field in dataclasses.fields(ty)
}
def coerce_dns_name(value: Any) -> dns.name.Name:
if not isinstance(value, dns.name.Name):
return dns.name.from_text(value, origin=dns.name.empty)
return value
def make_record(args, ty):
# TODO: Ça n'est pas du tout élégant, mais :
# 1. je n'ai pas réussi à spécifier dans `argument_spec` un type tiers
# 2. Ansible positionne à `None` les entrées non passées à la tâche et
# ce comportement ne semble pas modifiable
types = {f.name: f.type for f in dataclasses.fields(ty)}
coercers = {
dns.name.Name: coerce_dns_name,
RName: RName,
}
def coerce(name, value):
if types[name] not in coercers:
return value
return coercers[types[name]](value)
clean_args = {
name: coerce(name, value)
for name, value in args.items()
if value is not None
}
return ty(**clean_args)
def zones_eq(a: dns.zone.Zone, b: dns.zone.Zone) -> bool:
return a.to_text(relativize=False) == b.to_text(relativize=False)
def main() -> int:
record_types = {
"ns": NS,
"txt": TXT,
"a": A,
"aaaa": AAAA,
"mx": MX,
}
module_args = {
"path": {"type": "path", "required": True},
"origin": {"type": "str", "required": True},
"soa": {
"type": "dict",
"required": True,
"options": spec_options_of_type(SOA),
},
}
for name, ty in record_types.items():
module_args[name] = {
"type": "list",
"default": [],
"elements": "dict",
"options": spec_options_of_type(ty),
}
module = AnsibleModule(argument_spec=module_args)
origin = dns.name.from_text(module.params["origin"])
path = module.params["path"]
zone = dns.zone.Zone(origin)
try:
current = dns.zone.from_file(path, origin=origin)
except:
current = None
records = [make_record(module.params["soa"], SOA)]
records.extend(
itertools.chain.from_iterable(
(make_record(args, ty) for args in module.params[name])
for name, ty in record_types.items()
)
)
for record in records:
node = zone.get_node(record.name, create=True)
rdata = record.rdata()
dataset = node.get_rdataset(rdata.rdclass, rdata.rdtype, create=True)
dataset.add(rdata)
changed = current is None or not zones_eq(zone, current)
if changed:
zone.to_file(path, relativize=False)
module.exit_json(changed=changed)
return 0
if __name__ == "__main__":
exit(main())