|
|
|
@ -1,5 +1,4 @@
|
|
|
|
|
from dataclasses import dataclass, field
|
|
|
|
|
from abc import ABC, abstractmethod
|
|
|
|
|
from itertools import chain
|
|
|
|
|
from ipaddress import IPv4Network, IPv6Network
|
|
|
|
|
from typing import Any, Generic, TypeVar
|
|
|
|
@ -12,30 +11,8 @@ def flatten(l: list[list[T]]) -> list[T]:
|
|
|
|
|
return list(chain.from_iterable(l))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Base(ABC):
|
|
|
|
|
@abstractmethod
|
|
|
|
|
def to_nft(self) -> JsonNftables:
|
|
|
|
|
...
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def to_nft(value: Any) -> JsonNftables:
|
|
|
|
|
if isinstance(value, Base):
|
|
|
|
|
return value.to_nft()
|
|
|
|
|
elif isinstance(value, IPv4Network | IPv6Network):
|
|
|
|
|
return {
|
|
|
|
|
"prefix": {
|
|
|
|
|
"addr": str(value.network_address),
|
|
|
|
|
"len": value.prefixlen,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
elif isinstance(value, set):
|
|
|
|
|
return {"set": [to_nft(e) for e in value]}
|
|
|
|
|
return value
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Expressions
|
|
|
|
|
@dataclass
|
|
|
|
|
class Ct(Base):
|
|
|
|
|
class Ct:
|
|
|
|
|
key: str
|
|
|
|
|
|
|
|
|
|
def to_nft(self) -> JsonNftables:
|
|
|
|
@ -43,7 +20,7 @@ class Ct(Base):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
|
class Fib(Base):
|
|
|
|
|
class Fib:
|
|
|
|
|
flags: list[str]
|
|
|
|
|
result: str
|
|
|
|
|
|
|
|
|
@ -52,7 +29,7 @@ class Fib(Base):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
|
class Meta(Base):
|
|
|
|
|
class Meta:
|
|
|
|
|
key: str
|
|
|
|
|
|
|
|
|
|
def to_nft(self) -> JsonNftables:
|
|
|
|
@ -60,7 +37,7 @@ class Meta(Base):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
|
class Payload(Base):
|
|
|
|
|
class Payload:
|
|
|
|
|
protocol: str
|
|
|
|
|
field: str
|
|
|
|
|
|
|
|
|
@ -68,19 +45,36 @@ class Payload(Base):
|
|
|
|
|
return {"payload": {"protocol": self.protocol, "field": self.field}}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Immediate = int | str | bool | IPv4Network | IPv6Network
|
|
|
|
|
Immediate = int | str | bool | range | IPv4Network | IPv6Network
|
|
|
|
|
|
|
|
|
|
def imm_to_nft(value: Immediate) -> JsonNftables:
|
|
|
|
|
if isinstance(value, range):
|
|
|
|
|
return {"range": [range.start, range.stop - 1]}
|
|
|
|
|
elif isinstance(value, IPv4Network | IPv6Network):
|
|
|
|
|
return {
|
|
|
|
|
"prefix": {
|
|
|
|
|
"addr": str(value.network_address),
|
|
|
|
|
"len": value.prefixlen,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
elif isinstance(value, set):
|
|
|
|
|
return {"set": [imm_to_nft(e) for e in value]}
|
|
|
|
|
return value
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Expressions
|
|
|
|
|
Expression = Ct | Fib | Immediate | Meta | Payload
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Statements
|
|
|
|
|
@dataclass
|
|
|
|
|
class Counter(Base):
|
|
|
|
|
class Counter:
|
|
|
|
|
def to_nft(self) -> JsonNftables:
|
|
|
|
|
return {"counter": {"packets": 0, "bytes": 0}}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
|
class Goto(Base):
|
|
|
|
|
class Goto:
|
|
|
|
|
target: str
|
|
|
|
|
|
|
|
|
|
def to_nft(self) -> JsonNftables:
|
|
|
|
@ -88,24 +82,15 @@ class Goto(Base):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
|
class Jump(Base):
|
|
|
|
|
class Jump:
|
|
|
|
|
target: str
|
|
|
|
|
|
|
|
|
|
def to_nft(self) -> JsonNftables:
|
|
|
|
|
return {"jump": {"target": self.target}}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass(eq=True, frozen=True)
|
|
|
|
|
class Range(Base):
|
|
|
|
|
start: int
|
|
|
|
|
end: int
|
|
|
|
|
|
|
|
|
|
def to_nft(self) -> JsonNftables:
|
|
|
|
|
return {"range": [self.start, self.end]}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
|
class Match(Base):
|
|
|
|
|
class Match:
|
|
|
|
|
op: str
|
|
|
|
|
left: Expression
|
|
|
|
|
right: Expression
|
|
|
|
@ -113,15 +98,15 @@ class Match(Base):
|
|
|
|
|
def to_nft(self) -> JsonNftables:
|
|
|
|
|
match = {
|
|
|
|
|
"op": self.op,
|
|
|
|
|
"left": to_nft(self.left),
|
|
|
|
|
"right": to_nft(self.right),
|
|
|
|
|
"left": imm_to_nft(self.left),
|
|
|
|
|
"right": imm_to_nft(self.right),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return {"match": match}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
|
class Verdict(Base):
|
|
|
|
|
class Verdict:
|
|
|
|
|
verdict: str
|
|
|
|
|
|
|
|
|
|
target: str | None = None
|
|
|
|
@ -151,7 +136,7 @@ class Set:
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if self.elements:
|
|
|
|
|
set["elem"] = [to_nft(e) for e in self.elements]
|
|
|
|
|
set["elem"] = [imm_to_nft(e) for e in self.elements]
|
|
|
|
|
|
|
|
|
|
if self.flags:
|
|
|
|
|
set["flags"] = self.flags
|
|
|
|
|