Initial DSL

This commit is contained in:
v-lafeychine 2023-04-16 23:11:54 +02:00
commit 5d062daf33
Signed by: v-lafeychine
GPG key ID: F46CAAD27C7AB0D5
2 changed files with 192 additions and 0 deletions

65
example_rules.py Normal file
View file

@ -0,0 +1,65 @@
---
zones:
- name: users-internet-allowed
include:
- rules.yaml
- name: mgmt
include:
- 10.203.0.0/16
- name: adm
include:
- 2a09:6840::/29
- 10.128.0.0/16
- name: internet
exclude:
- adm
- mgmt
blacklist:
enabled: true
addr:
- 0.0.0.0
reverse_path_filter:
enabled: true
filter:
input:
- iif: lo
verdict: accept
- src: mgmt
protocols:
tcp:
dport: "22,240..242"
verdict: accept
- src: backbone
protocols:
ospf: true
vrrp: true
tcp:
dport: 179
verdict: accept
- protocols:
icmp: true
verdict: accept
output:
- verdict: accept
forward:
- src: interco-crans
verdict: accept
- src: users-internet-allowed
tcp:
dport: 25
verdict: drop
- src: users-internet-allowed
dest:
- internet
- 10.0.0.1
verdict: accept
nat:
- src: mgmt
snat:
addr: 45.66.108.14
persistent: true
...

127
nftables.py Executable file
View file

@ -0,0 +1,127 @@
#!/usr/bin/env python3
from __future__ import annotations
from argparse import ArgumentParser, FileType
from enum import Enum
from pydantic import BaseModel, FilePath, IPvAnyAddress, IPvAnyNetwork, validator, root_validator
from yaml import safe_load
def parse_range_string(s):
parts = s.split(",")
values = []
for part in parts:
if ".." in part:
start, end = part.split("..")
start = int(start)
end = int(end)
values += [start + i for i in range(end - start + 1)]
else:
values.append(int(part))
return values
# Zones
class ZoneName(str):
pass
class Zone(BaseModel):
name: ZoneName
exclude: list[IPvAnyNetwork | FilePath | ZoneName] | None
include: list[IPvAnyNetwork | FilePath | ZoneName] | None
@root_validator()
def validate_mutually_exclusive(cls, values):
if values.get("exclude") and values.get("include"):
raise ValueError("exclude and include are mutually exclusive")
return values
# Blacklist
class BlackList(BaseModel):
enabled: bool = False
addr: list[IPvAnyAddress] = []
# Reverse Path Filter
class ReversePathFilter(BaseModel):
enabled: bool = False
# Filters
class Verdict(str, Enum):
accept = "accept"
drop = "drop"
reject = "reject"
class TcpProtocol(BaseModel):
dport: str | None
sport: str | None
@validator("dport", "sport")
def parse_range(cls, v):
return parse_range_string(v)
class UdpProtocol(BaseModel):
dport: str | None
sport: str | None
@validator("dport", "sport")
def parse_range(cls, v):
return parse_range_string(v)
class Protocols(BaseModel):
icmp: bool = False
ospf: bool = False
tcp: TcpProtocol | None
udp: UdpProtocol | None
vrrp: bool = False
class Rule(BaseModel):
iff: str | None
protocols: Protocols = Protocols()
src: ZoneName | list[IPvAnyNetwork | FilePath | ZoneName] | None
verdict: Verdict = Verdict.accept
class ForwardRule(Rule):
dest: ZoneName | list[IPvAnyNetwork | FilePath | ZoneName] | None
class Filter(BaseModel):
input: list[Rule] = []
output: list[Rule] = []
forward: list[ForwardRule] = []
# Nat
class SNat(BaseModel):
addr: IPvAnyAddress
persistent: bool = True
class Nat(BaseModel):
src: ZoneName | list[IPvAnyNetwork | FilePath | ZoneName] | None
snat: SNat
# Root model
class Firewall(BaseModel):
zones: list[Zone] = []
blacklist: BlackList | None
reverse_path_filter: ReversePathFilter | None
filter: Filter | None
nat: list[Nat] = []
def main():
parser = ArgumentParser()
parser.add_argument("file", type=FileType("r"), help="YAML rule file")
args = parser.parse_args()
rules = Firewall(**safe_load(args.file))
print(rules)
return 0
if __name__ == "__main__":
main()