firewall/nft.py

266 lines
5.7 KiB
Python
Raw Permalink Normal View History

2023-08-27 12:56:41 +02:00
from dataclasses import dataclass, field
from itertools import chain
2023-08-30 22:34:29 +02:00
from ipaddress import IPv4Address, IPv4Network, IPv6Network
2023-08-30 16:22:44 +02:00
from typing import Any, Generic, TypeVar, get_args
2023-08-27 12:56:41 +02:00
T = TypeVar("T")
JsonNftables = dict[str, Any]
def flatten(l: list[list[T]]) -> list[T]:
return list(chain.from_iterable(l))
2023-08-30 16:22:44 +02:00
Immediate = int | str | bool | set | range | IPv4Network | IPv6Network
2023-08-28 02:32:40 +02:00
@dataclass
2023-08-29 21:20:28 +02:00
class Ct:
2023-08-28 02:32:40 +02:00
key: str
def to_nft(self) -> JsonNftables:
return {"ct": {"key": self.key}}
2023-08-27 22:32:33 +02:00
@dataclass
2023-08-29 21:20:28 +02:00
class Fib:
2023-08-27 22:32:33 +02:00
flags: list[str]
result: str
def to_nft(self) -> JsonNftables:
return {"fib": {"flags": self.flags, "result": self.result}}
@dataclass
2023-08-29 21:20:28 +02:00
class Meta:
2023-08-27 22:32:33 +02:00
key: str
def to_nft(self) -> JsonNftables:
return {"meta": {"key": self.key}}
2023-08-27 12:56:41 +02:00
@dataclass
2023-08-29 21:20:28 +02:00
class Payload:
2023-08-27 12:56:41 +02:00
protocol: str
field: str
def to_nft(self) -> JsonNftables:
return {"payload": {"protocol": self.protocol, "field": self.field}}
2023-08-30 16:22:44 +02:00
Expression = Ct | Fib | Immediate | Meta | Payload
2023-08-29 21:20:28 +02:00
2023-08-30 16:22:44 +02:00
def imm_to_nft(value: Immediate) -> Any:
2023-08-29 21:20:28 +02:00
if isinstance(value, range):
2023-08-30 16:22:44 +02:00
return {"range": [value.start, value.stop - 1]}
2023-08-29 21:20:28 +02:00
elif isinstance(value, IPv4Network | IPv6Network):
return {
"prefix": {
"addr": str(value.network_address),
"len": value.prefixlen,
}
}
2023-08-30 16:22:44 +02:00
2023-08-29 21:20:28 +02:00
elif isinstance(value, set):
2023-08-30 16:22:44 +02:00
return {"set": [expr_to_nft(e) for e in value]}
2023-08-29 21:20:28 +02:00
return value
2023-08-30 16:22:44 +02:00
def expr_to_nft(value: Expression) -> Any:
if isinstance(value, get_args(Immediate)):
return imm_to_nft(value) # type: ignore
return value.to_nft() # type: ignore
2023-08-27 12:56:41 +02:00
2023-08-27 22:32:33 +02:00
# Statements
2023-08-28 02:32:40 +02:00
@dataclass
2023-08-29 21:20:28 +02:00
class Counter:
2023-08-28 02:32:40 +02:00
def to_nft(self) -> JsonNftables:
return {"counter": {"packets": 0, "bytes": 0}}
@dataclass
2023-08-29 21:20:28 +02:00
class Goto:
2023-08-28 02:32:40 +02:00
target: str
def to_nft(self) -> JsonNftables:
return {"goto": {"target": self.target}}
2023-08-28 11:09:59 +02:00
@dataclass
2023-08-29 21:20:28 +02:00
class Jump:
2023-08-28 11:09:59 +02:00
target: str
def to_nft(self) -> JsonNftables:
return {"jump": {"target": self.target}}
2023-08-27 12:56:41 +02:00
@dataclass
2023-08-29 21:20:28 +02:00
class Match:
2023-08-27 12:56:41 +02:00
op: str
left: Expression
right: Expression
def to_nft(self) -> JsonNftables:
2023-08-27 21:35:39 +02:00
match = {
"op": self.op,
2023-08-30 16:22:44 +02:00
"left": expr_to_nft(self.left),
"right": expr_to_nft(self.right),
2023-08-27 12:56:41 +02:00
}
2023-08-27 21:35:39 +02:00
return {"match": match}
2023-08-27 12:56:41 +02:00
2023-08-30 22:34:29 +02:00
@dataclass
class Snat:
addr: IPv4Network | IPv4Address
port: range | None
persistent: bool
def to_nft(self) -> JsonNftables:
snat: JsonNftables = {}
if isinstance(self.addr, IPv4Network):
snat["addr"] = {"range": [str(self.addr[0]), str(self.addr[-1])]}
else:
snat["addr"] = str(self.addr)
if self.port is not None:
snat["port"] = imm_to_nft(self.port)
if self.persistent:
snat["flags"] = "persistent"
return {"snat": snat}
2023-08-27 22:32:33 +02:00
@dataclass
2023-08-29 21:20:28 +02:00
class Verdict:
2023-08-27 22:32:33 +02:00
verdict: str
2023-08-27 12:56:41 +02:00
2023-08-27 22:32:33 +02:00
target: str | None = None
2023-08-27 12:56:41 +02:00
2023-08-27 22:32:33 +02:00
def to_nft(self) -> JsonNftables:
return {self.verdict: self.target}
2023-08-30 22:34:29 +02:00
Statement = Counter | Goto | Jump | Match | Snat | Verdict
2023-08-27 22:32:33 +02:00
# Ruleset
2023-08-27 21:35:39 +02:00
@dataclass
class Set:
name: str
2023-08-27 22:32:33 +02:00
type: str
2023-08-27 21:35:39 +02:00
2023-08-28 02:32:40 +02:00
flags: list[str] | None = None
2023-08-28 12:34:59 +02:00
elements: list[Immediate] = field(default_factory=list)
2023-08-27 21:35:39 +02:00
def to_nft(self, family: str, table: str) -> JsonNftables:
set: JsonNftables = {
"name": self.name,
"family": family,
"table": table,
"type": self.type,
}
if self.elements:
2023-08-29 21:20:28 +02:00
set["elem"] = [imm_to_nft(e) for e in self.elements]
2023-08-27 21:35:39 +02:00
2023-08-27 22:32:33 +02:00
if self.flags:
set["flags"] = self.flags
2023-08-27 21:35:39 +02:00
return {"add": {"set": set}}
2023-08-27 12:56:41 +02:00
@dataclass
class Rule:
2023-08-30 22:34:29 +02:00
stmts: list[Statement] = field(default_factory=list)
2023-08-27 12:56:41 +02:00
def to_nft(self, family: str, table: str, chain: str) -> JsonNftables:
2023-08-27 21:35:39 +02:00
rule = {
"family": family,
"table": table,
"chain": chain,
"expr": [stmt.to_nft() for stmt in self.stmts],
2023-08-27 12:56:41 +02:00
}
2023-08-27 21:35:39 +02:00
return {"add": {"rule": rule}}
2023-08-27 12:56:41 +02:00
@dataclass
class Chain:
name: str
type: str | None = None
hook: str | None = None
priority: int | None = None
policy: str | None = None
rules: list[Rule] = field(default_factory=list)
def to_nft(self, family: str, table: str) -> list[JsonNftables]:
chain: JsonNftables = {
"name": self.name,
"family": family,
"table": table,
}
if self.type is not None:
chain["type"] = self.type
if self.hook is not None:
chain["hook"] = self.hook
if self.priority is not None:
chain["prio"] = self.priority
if self.policy is not None:
chain["policy"] = self.policy
commands = [{"add": {"chain": chain}}]
for rule in self.rules:
commands.append(rule.to_nft(family, table, self.name))
return commands
@dataclass
class Table:
family: str
name: str
chains: list[Chain] = field(default_factory=list)
sets: list[Set] = field(default_factory=list)
def to_nft(self) -> list[JsonNftables]:
2023-08-30 16:22:44 +02:00
commands = [{"add": {"table": {"family": self.family, "name": self.name}}}]
2023-08-27 12:56:41 +02:00
for set in self.sets:
commands.append(set.to_nft(self.family, self.name))
for chain in self.chains:
commands.extend(chain.to_nft(self.family, self.name))
return commands
@dataclass
class Ruleset:
flush: bool
tables: list[Table] = field(default_factory=list)
def to_nft(self) -> JsonNftables:
ruleset = flatten([table.to_nft() for table in self.tables])
if self.flush:
ruleset.insert(0, {"flush": {"ruleset": None}})
return {"nftables": ruleset}