2022-08-19 04:57:27 +02:00
|
|
|
import ipaddress
|
2022-08-27 13:47:08 +02:00
|
|
|
from operator import attrgetter
|
2022-08-19 04:57:27 +02:00
|
|
|
|
|
|
|
import dns.name
|
|
|
|
|
|
|
|
|
|
|
|
class FilterModule:
|
|
|
|
def filters(self):
|
|
|
|
return {
|
|
|
|
"add_origin": add_origin,
|
|
|
|
"add_origin_keys": add_origin_keys,
|
|
|
|
"ip_filter": ip_filter,
|
2022-08-27 04:22:15 +02:00
|
|
|
"remove_domain_suffix": remove_domain_suffix,
|
2022-08-27 13:47:08 +02:00
|
|
|
"ipaddr_sort": ipaddr_sort,
|
2022-08-19 04:57:27 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
def ip_filter(addresses, networks):
|
|
|
|
if isinstance(addresses, dict):
|
|
|
|
return {k: ip_filter(v, networks) for k, v in addresses.items()}
|
|
|
|
ip_networks = [ipaddress.ip_network(n) for n in networks]
|
|
|
|
ip_addresses = [ipaddress.ip_address(a) for a in addresses]
|
|
|
|
return [str(a) for a in ip_addresses if any(a in n for n in ip_networks)]
|
|
|
|
|
|
|
|
|
|
|
|
def add_origin(name, origin="."):
|
|
|
|
return dns.name.from_text(name, dns.name.from_text(origin)).to_text()
|
|
|
|
|
|
|
|
|
|
|
|
def add_origin_keys(dct, origin="."):
|
|
|
|
return {add_origin(k, origin): v for k, v in dct.items()}
|
2022-08-30 13:54:54 +02:00
|
|
|
|
|
|
|
|
2022-08-27 04:22:15 +02:00
|
|
|
def remove_domain_suffix(name):
|
|
|
|
parent = dns.name.from_text(name).parent()
|
|
|
|
return parent.to_text()
|
2022-08-27 13:47:08 +02:00
|
|
|
|
|
|
|
|
|
|
|
def ipaddr_sort(addrs, types, unknown_after=True):
|
|
|
|
check_types = {
|
|
|
|
"global": attrgetter("is_global"),
|
|
|
|
"link-local": attrgetter("is_link_local"),
|
|
|
|
"loopback": attrgetter("is_loopback"),
|
|
|
|
"multicast": attrgetter("is_multicast"),
|
|
|
|
"private": attrgetter("is_private"),
|
|
|
|
"reserved": attrgetter("is_reserved"),
|
|
|
|
"site_local": attrgetter("is_site_local"),
|
|
|
|
"unspecified": attrgetter("is_unspecified"),
|
|
|
|
}
|
|
|
|
|
|
|
|
def addr_weight(addr):
|
|
|
|
if isinstance(addr, str):
|
|
|
|
addr = ipaddress.ip_address(addr.split("/")[0])
|
|
|
|
for index, ty in enumerate(types):
|
|
|
|
if check_types[ty](ipaddress.ip_address(addr)):
|
|
|
|
return index
|
|
|
|
return len(types) if unknown_after else -1
|
|
|
|
|
|
|
|
return sorted(addrs, key=addr_weight)
|