firewall/nftables.py

154 lines
3.2 KiB
Python
Raw Normal View History

2023-04-16 23:11:54 +02:00
#!/usr/bin/env python3
from __future__ import annotations
from argparse import ArgumentParser, FileType
from enum import Enum
from pydantic import (
BaseModel,
Extra,
FilePath,
IPvAnyAddress,
IPvAnyNetwork,
validator,
root_validator,
)
2023-04-16 23:11:54 +02:00
from yaml import safe_load
class RestrictiveBaseModel(BaseModel, extra=Extra.forbid):
pass
2023-04-16 23:11:54 +02:00
def parse_range_string(s):
parts = s.split(",")
values = []
for part in parts:
if ".." in part:
start, end = part.split("..")
values.append(range(int(start), int(end) + 1))
2023-04-16 23:11:54 +02:00
else:
values.append(int(part))
return values
# Zones
2023-04-16 23:11:54 +02:00
class ZoneName(str):
pass
class Zone(RestrictiveBaseModel):
2023-04-16 23:11:54 +02:00
name: ZoneName
exclude: list[IPvAnyNetwork | ZoneName | FilePath] | None
include: list[IPvAnyNetwork | ZoneName | FilePath] | None
2023-04-16 23:11:54 +02:00
@root_validator()
def validate_mutually_exactly_one(cls, values):
2023-04-16 23:11:54 +02:00
if values.get("exclude") and values.get("include"):
raise ValueError("exclude and include are mutually exclusive")
if values.get("exclude") is None and values.get("include") is None:
raise ValueError("exactly one of exclude and include must be set")
2023-04-16 23:11:54 +02:00
return values
# Blacklist
class BlackList(RestrictiveBaseModel):
2023-04-16 23:11:54 +02:00
enabled: bool = False
addr: list[IPvAnyAddress] = []
# Reverse Path Filter
class ReversePathFilter(RestrictiveBaseModel):
2023-04-16 23:11:54 +02:00
enabled: bool = False
# Filters
2023-04-16 23:11:54 +02:00
class Verdict(str, Enum):
accept = "accept"
drop = "drop"
reject = "reject"
class TcpProtocol(RestrictiveBaseModel):
2023-04-16 23:11:54 +02:00
dport: str | None
sport: str | None
@validator("dport", "sport")
def parse_range(cls, v):
return parse_range_string(v)
class UdpProtocol(RestrictiveBaseModel):
2023-04-16 23:11:54 +02:00
dport: str | None
sport: str | None
@validator("dport", "sport")
def parse_range(cls, v):
return parse_range_string(v)
class Protocols(RestrictiveBaseModel):
2023-04-16 23:11:54 +02:00
icmp: bool = False
ospf: bool = False
tcp: TcpProtocol | None
udp: UdpProtocol | None
vrrp: bool = False
class Rule(RestrictiveBaseModel):
iif: str | None
oif: str | None
2023-04-16 23:11:54 +02:00
protocols: Protocols = Protocols()
src: ZoneName | list[IPvAnyNetwork | ZoneName | FilePath] | None
2023-04-16 23:11:54 +02:00
verdict: Verdict = Verdict.accept
2023-04-16 23:11:54 +02:00
class ForwardRule(Rule):
dest: ZoneName | list[IPvAnyNetwork | ZoneName | FilePath] | None
2023-04-16 23:11:54 +02:00
class Filter(RestrictiveBaseModel):
2023-04-16 23:11:54 +02:00
input: list[Rule] = []
output: list[Rule] = []
forward: list[ForwardRule] = []
# Nat
class SNat(RestrictiveBaseModel):
2023-04-16 23:11:54 +02:00
addr: IPvAnyAddress
persistent: bool = True
class Nat(RestrictiveBaseModel):
src: ZoneName | list[IPvAnyNetwork | ZoneName | FilePath] | None
2023-04-16 23:11:54 +02:00
snat: SNat
# Root model
class Firewall(RestrictiveBaseModel):
2023-04-16 23:11:54 +02:00
zones: list[Zone] = []
blacklist: BlackList | None
reverse_path_filter: ReversePathFilter | None
filter: Filter | None
nat: list[Nat] = []
2023-04-16 23:11:54 +02:00
def main():
parser = ArgumentParser()
parser.add_argument("file", type=FileType("r"), help="YAML rule file")
args = parser.parse_args()
rules = Firewall(**safe_load(args.file))
print(rules)
return 0
2023-04-16 23:11:54 +02:00
if __name__ == "__main__":
main()