firewall/nft.py

183 lines
3.9 KiB
Python
Raw Normal View History

2023-08-27 12:56:41 +02:00
from dataclasses import dataclass, field
from itertools import chain
2023-08-27 21:35:39 +02:00
from ipaddress import IPv4Network, IPv6Network
from typing import Any, Generic, TypeVar
2023-08-27 12:56:41 +02:00
T = TypeVar("T")
JsonNftables = dict[str, Any]
def flatten(l: list[list[T]]) -> list[T]:
return list(chain.from_iterable(l))
2023-08-27 21:35:39 +02:00
def ip_to_nft(ip: IPv4Network | IPv6Network) -> JsonNftables:
return {"prefix": {"addr": str(ip.network_address), "len": ip.prefixlen}}
2023-08-27 21:35:39 +02:00
@dataclass(eq=True, frozen=True)
class Immediate(Generic[T]):
value: T
2023-08-27 12:56:41 +02:00
2023-08-27 21:35:39 +02:00
def to_nft(self) -> Any:
if isinstance(self.value, IPv4Network) or isinstance(
self.value, IPv6Network
):
return ip_to_nft(self.value)
2023-08-27 12:56:41 +02:00
return self.value
@dataclass
class Payload:
protocol: str
field: str
def to_nft(self) -> JsonNftables:
return {"payload": {"protocol": self.protocol, "field": self.field}}
Expression = Immediate | Payload
@dataclass
class Verdict:
verdict: str
target: str | None = None
def to_nft(self) -> JsonNftables:
return {self.verdict: self.target}
@dataclass
class Match:
op: str
left: Expression
right: Expression
def to_nft(self) -> JsonNftables:
2023-08-27 21:35:39 +02:00
match = {
"op": self.op,
"left": self.left.to_nft(),
"right": self.right.to_nft(),
2023-08-27 12:56:41 +02:00
}
2023-08-27 21:35:39 +02:00
return {"match": match}
2023-08-27 12:56:41 +02:00
Statement = Verdict | Match
2023-08-27 21:35:39 +02:00
@dataclass
class Set:
name: str
flags: list[str]
type: str | list[str]
elements: list[Immediate] = field(default_factory=list)
def to_nft(self, family: str, table: str) -> JsonNftables:
set: JsonNftables = {
"name": self.name,
"family": family,
"table": table,
"flags": self.flags,
"type": self.type,
}
if self.elements:
set["elem"] = [element.to_nft() for element in self.elements]
return {"add": {"set": set}}
2023-08-27 12:56:41 +02:00
@dataclass
class Rule:
stmts: list[Statement]
def to_nft(self, family: str, table: str, chain: str) -> JsonNftables:
2023-08-27 21:35:39 +02:00
rule = {
"family": family,
"table": table,
"chain": chain,
"expr": [stmt.to_nft() for stmt in self.stmts],
2023-08-27 12:56:41 +02:00
}
2023-08-27 21:35:39 +02:00
return {"add": {"rule": rule}}
2023-08-27 12:56:41 +02:00
@dataclass
class Chain:
name: str
type: str | None = None
hook: str | None = None
priority: int | None = None
policy: str | None = None
rules: list[Rule] = field(default_factory=list)
def to_nft(self, family: str, table: str) -> list[JsonNftables]:
chain: JsonNftables = {
"name": self.name,
"family": family,
"table": table,
}
if self.type is not None:
chain["type"] = self.type
if self.hook is not None:
chain["hook"] = self.hook
if self.priority is not None:
chain["prio"] = self.priority
if self.policy is not None:
chain["policy"] = self.policy
commands = [{"add": {"chain": chain}}]
for rule in self.rules:
commands.append(rule.to_nft(family, table, self.name))
return commands
@dataclass
class Table:
family: str
name: str
chains: list[Chain] = field(default_factory=list)
sets: list[Set] = field(default_factory=list)
def to_nft(self) -> list[JsonNftables]:
2023-08-27 21:35:39 +02:00
commands = [
{"add": {"table": {"family": self.family, "name": self.name}}}
]
2023-08-27 12:56:41 +02:00
for set in self.sets:
commands.append(set.to_nft(self.family, self.name))
for chain in self.chains:
commands.extend(chain.to_nft(self.family, self.name))
return commands
@dataclass
class Ruleset:
flush: bool
tables: list[Table] = field(default_factory=list)
def to_nft(self) -> JsonNftables:
ruleset = flatten([table.to_nft() for table in self.tables])
if self.flush:
ruleset.insert(0, {"flush": {"ruleset": None}})
return {"nftables": ruleset}