firewall/nft.py
2023-08-28 11:09:59 +02:00

240 lines
5 KiB
Python

from dataclasses import dataclass, field
from itertools import chain
from ipaddress import IPv4Network, IPv6Network
from typing import Any, Generic, TypeVar
T = TypeVar("T")
JsonNftables = dict[str, Any]
def flatten(l: list[list[T]]) -> list[T]:
return list(chain.from_iterable(l))
def ip_to_nft(ip: IPv4Network | IPv6Network) -> JsonNftables:
return {"prefix": {"addr": str(ip.network_address), "len": ip.prefixlen}}
# Expressions
@dataclass
class Ct:
key: str
def to_nft(self) -> JsonNftables:
return {"ct": {"key": self.key}}
@dataclass
class Fib:
flags: list[str]
result: str
def to_nft(self) -> JsonNftables:
return {"fib": {"flags": self.flags, "result": self.result}}
@dataclass(eq=True, frozen=True)
class Immediate(Generic[T]):
value: T
def to_nft(self) -> Any:
if isinstance(self.value, IPv4Network | IPv6Network):
return ip_to_nft(self.value)
if isinstance(self.value, set):
for elem in self.value:
if isinstance(elem, Immediate):
return {"set": [elem.to_nft() for elem in self.value]}
return {"set": list(self.value)}
return self.value
@dataclass
class Meta:
key: str
def to_nft(self) -> JsonNftables:
return {"meta": {"key": self.key}}
@dataclass
class Payload:
protocol: str
field: str
def to_nft(self) -> JsonNftables:
return {"payload": {"protocol": self.protocol, "field": self.field}}
Expression = Ct | Fib | Immediate | Meta | Payload
# Statements
@dataclass
class Counter:
def to_nft(self) -> JsonNftables:
return {"counter": {"packets": 0, "bytes": 0}}
@dataclass
class Goto:
target: str
def to_nft(self) -> JsonNftables:
return {"goto": {"target": self.target}}
@dataclass
class Jump:
target: str
def to_nft(self) -> JsonNftables:
return {"jump": {"target": self.target}}
@dataclass
class Match:
op: str
left: Expression
right: Expression
def to_nft(self) -> JsonNftables:
match = {
"op": self.op,
"left": self.left.to_nft(),
"right": self.right.to_nft(),
}
return {"match": match}
@dataclass
class Verdict:
verdict: str
target: str | None = None
def to_nft(self) -> JsonNftables:
return {self.verdict: self.target}
Statement = Counter | Goto | Match | Verdict
# Ruleset
@dataclass
class Set:
name: str
type: str
flags: list[str] | None = None
elements: list[Immediate[Any]] = field(default_factory=list)
def to_nft(self, family: str, table: str) -> JsonNftables:
set: JsonNftables = {
"name": self.name,
"family": family,
"table": table,
"type": self.type,
}
if self.elements:
set["elem"] = [element.to_nft() for element in self.elements]
if self.flags:
set["flags"] = self.flags
return {"add": {"set": set}}
@dataclass
class Rule:
stmts: list[Statement]
def to_nft(self, family: str, table: str, chain: str) -> JsonNftables:
rule = {
"family": family,
"table": table,
"chain": chain,
"expr": [stmt.to_nft() for stmt in self.stmts],
}
return {"add": {"rule": rule}}
@dataclass
class Chain:
name: str
type: str | None = None
hook: str | None = None
priority: int | None = None
policy: str | None = None
rules: list[Rule] = field(default_factory=list)
def to_nft(self, family: str, table: str) -> list[JsonNftables]:
chain: JsonNftables = {
"name": self.name,
"family": family,
"table": table,
}
if self.type is not None:
chain["type"] = self.type
if self.hook is not None:
chain["hook"] = self.hook
if self.priority is not None:
chain["prio"] = self.priority
if self.policy is not None:
chain["policy"] = self.policy
commands = [{"add": {"chain": chain}}]
for rule in self.rules:
commands.append(rule.to_nft(family, table, self.name))
return commands
@dataclass
class Table:
family: str
name: str
chains: list[Chain] = field(default_factory=list)
sets: list[Set] = field(default_factory=list)
def to_nft(self) -> list[JsonNftables]:
commands = [
{"add": {"table": {"family": self.family, "name": self.name}}}
]
for set in self.sets:
commands.append(set.to_nft(self.family, self.name))
for chain in self.chains:
commands.extend(chain.to_nft(self.family, self.name))
return commands
@dataclass
class Ruleset:
flush: bool
tables: list[Table] = field(default_factory=list)
def to_nft(self) -> JsonNftables:
ruleset = flatten([table.to_nft() for table in self.tables])
if self.flush:
ruleset.insert(0, {"flush": {"ruleset": None}})
return {"nftables": ruleset}