From 5d062daf33b21ab68b2b17c85f73d6ea45584af2 Mon Sep 17 00:00:00 2001 From: Vincent Lafeychine Date: Sun, 16 Apr 2023 23:11:54 +0200 Subject: [PATCH] Initial DSL --- example_rules.py | 65 ++++++++++++++++++++++++ nftables.py | 127 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 192 insertions(+) create mode 100644 example_rules.py create mode 100755 nftables.py diff --git a/example_rules.py b/example_rules.py new file mode 100644 index 0000000..ea69f81 --- /dev/null +++ b/example_rules.py @@ -0,0 +1,65 @@ +--- +zones: + - name: users-internet-allowed + include: + - rules.yaml + - name: mgmt + include: + - 10.203.0.0/16 + - name: adm + include: + - 2a09:6840::/29 + - 10.128.0.0/16 + - name: internet + exclude: + - adm + - mgmt + +blacklist: + enabled: true + addr: + - 0.0.0.0 + +reverse_path_filter: + enabled: true + +filter: + input: + - iif: lo + verdict: accept + - src: mgmt + protocols: + tcp: + dport: "22,240..242" + verdict: accept + - src: backbone + protocols: + ospf: true + vrrp: true + tcp: + dport: 179 + verdict: accept + - protocols: + icmp: true + verdict: accept + output: + - verdict: accept + forward: + - src: interco-crans + verdict: accept + - src: users-internet-allowed + tcp: + dport: 25 + verdict: drop + - src: users-internet-allowed + dest: + - internet + - 10.0.0.1 + verdict: accept + +nat: + - src: mgmt + snat: + addr: 45.66.108.14 + persistent: true +... diff --git a/nftables.py b/nftables.py new file mode 100755 index 0000000..ee05f5c --- /dev/null +++ b/nftables.py @@ -0,0 +1,127 @@ +#!/usr/bin/env python3 + +from __future__ import annotations +from argparse import ArgumentParser, FileType +from enum import Enum +from pydantic import BaseModel, FilePath, IPvAnyAddress, IPvAnyNetwork, validator, root_validator +from yaml import safe_load + +def parse_range_string(s): + parts = s.split(",") + values = [] + + for part in parts: + if ".." in part: + start, end = part.split("..") + start = int(start) + end = int(end) + values += [start + i for i in range(end - start + 1)] + else: + values.append(int(part)) + + return values + +# Zones + +class ZoneName(str): + pass + +class Zone(BaseModel): + name: ZoneName + exclude: list[IPvAnyNetwork | FilePath | ZoneName] | None + include: list[IPvAnyNetwork | FilePath | ZoneName] | None + + @root_validator() + def validate_mutually_exclusive(cls, values): + if values.get("exclude") and values.get("include"): + raise ValueError("exclude and include are mutually exclusive") + return values + +# Blacklist + +class BlackList(BaseModel): + enabled: bool = False + addr: list[IPvAnyAddress] = [] + +# Reverse Path Filter + +class ReversePathFilter(BaseModel): + enabled: bool = False + +# Filters + +class Verdict(str, Enum): + accept = "accept" + drop = "drop" + reject = "reject" + +class TcpProtocol(BaseModel): + dport: str | None + sport: str | None + + @validator("dport", "sport") + def parse_range(cls, v): + return parse_range_string(v) + +class UdpProtocol(BaseModel): + dport: str | None + sport: str | None + + @validator("dport", "sport") + def parse_range(cls, v): + return parse_range_string(v) + +class Protocols(BaseModel): + icmp: bool = False + ospf: bool = False + tcp: TcpProtocol | None + udp: UdpProtocol | None + vrrp: bool = False + +class Rule(BaseModel): + iff: str | None + protocols: Protocols = Protocols() + src: ZoneName | list[IPvAnyNetwork | FilePath | ZoneName] | None + verdict: Verdict = Verdict.accept + +class ForwardRule(Rule): + dest: ZoneName | list[IPvAnyNetwork | FilePath | ZoneName] | None + +class Filter(BaseModel): + input: list[Rule] = [] + output: list[Rule] = [] + forward: list[ForwardRule] = [] + +# Nat + +class SNat(BaseModel): + addr: IPvAnyAddress + persistent: bool = True + +class Nat(BaseModel): + src: ZoneName | list[IPvAnyNetwork | FilePath | ZoneName] | None + snat: SNat + +# Root model + +class Firewall(BaseModel): + zones: list[Zone] = [] + blacklist: BlackList | None + reverse_path_filter: ReversePathFilter | None + filter: Filter | None + nat: list[Nat] = [] + +def main(): + parser = ArgumentParser() + parser.add_argument("file", type=FileType("r"), help="YAML rule file") + + args = parser.parse_args() + + rules = Firewall(**safe_load(args.file)) + + print(rules) + + return 0 + +if __name__ == "__main__": + main()