This commit is contained in:
korenstin 2025-08-26 16:05:54 +02:00
parent baa0c9fabe
commit 25dc82c5c6
Signed by: korenstin
GPG key ID: 0FC4734F279D20A1
6 changed files with 244 additions and 11 deletions

1
.gitignore vendored Normal file
View file

@ -0,0 +1 @@
__pycache__

View file

@ -12,7 +12,7 @@ class Switch(BaseModel):
class State(BaseModel):
untag: int | None = None
tag: set[int] = []
tag: set[int] = set()
class Room(BaseModel):

View file

@ -1,11 +1,140 @@
import json
from brassage.config import State, Switch
from brassage.switch_api import SwitchApi
def state2dict(state: State):
state_dict = {}
if state.untag is not None:
state_dict[state.untag] = "POM_UNTAGGED"
for vlan in state.tag:
state_dict[vlan] = "POM_TAGGED_STATIC"
return state_dict
class Interface:
def __init__(self, switch: Switch) -> None:
self._switch = switch
async def fetch(self, port: str, timeout: int = 2) -> State | None:
return State(untag=102, tag=[120, 404, 505])
def _config_vlans_ports(self, port: str, api: SwitchApi, timeout: int = 2):
response = api.get("/vlans-ports", timeout=timeout)
async def update(self, port: str, state: State) -> None: ...
if response.status_code <= 204:
data = response.json()
if (
data["collection_result"]["total_elements_count"]
!= data["collection_result"]["filtered_elements_count"]
):
raise Exception("Impossible to get all tuple (vlan, port).")
vlans = [
s for s in data["vlan_port_element"] if s["port_id"] == port
]
tag = []
untag = []
for s in vlans:
if s["port_mode"] == "POM_TAGGED_STATIC":
tag.append(s["vlan_id"])
else:
untag.append(s["vlan_id"])
if len(untag) > 2:
raise ValueError(
f"The number of untagged vlans must be 0 or 1 (not {len(untag)})."
)
untag = None if len(untag) == 0 else untag[0]
return State(untag=untag, tag=tag)
raise Exception(
f"Impossible to get configuration ({response.status_code})"
)
async def fetch(self, port: str, timeout: int = 2) -> State | None:
api = SwitchApi(self._switch.url)
api.login(self._switch.user, self._switch.password)
try:
vlans = self._config_vlans_ports(port, api, timeout)
finally:
api.logout()
return vlans
async def update(self, port: str, state: State, timeout: int = 2) -> None:
api = SwitchApi(self._switch.url)
success = api.login(self._switch.user, self._switch.password)
if not success:
raise Exception("Login failed.")
# 1. Get vlans
try:
vlans = self._config_vlans_ports(port, api, timeout=timeout)
except:
api.logout()
raise Exception("Failed to check vlans")
# 2. Add vlans
vlans_dict = state2dict(vlans)
state_dict = state2dict(state)
for vlan in state_dict:
data = {
"vlan_id": vlan,
"port_id": port,
"port_mode": state_dict[vlan],
}
if vlan not in vlans_dict:
try:
api.post(
"/vlans-ports",
data=json.dumps(data),
timeout=timeout,
)
except:
api.logout()
raise Exception("Failed to set vlans up")
elif vlans_dict[vlan] != state_dict[vlan]:
try:
api.put(
f"/vlans-ports/{vlan}-{port}",
data=json.dumps(data),
timeout=timeout,
)
except:
api.logout()
raise Exception("Failed to set vlans up")
# 3. Check vlans
try:
vlans = self._config_vlans_ports(port, api, timeout=timeout)
except:
api.logout()
if int(vlans.untag) != int(state.untag) or not vlans.tag.issubset(
state.tag
):
api.logout()
raise Exception("Failed to add vlans")
# 4. Remove vlans
vlans_dict = state2dict(vlans)
state_dict = state2dict(state)
for vlan in vlans_dict:
if vlan not in state_dict:
try:
api.delete(f"/vlans-ports/{vlan}-{port}", timeout=timeout)
except:
api.logout(timeout=timeout)
# 5. Check vlans
try:
vlans = self._config_vlans_ports(port, api, timeout=timeout)
except:
api.logout()
raise Exception("Failed to check vlans")
if vlans.untag != state.untag or vlans.tag != state.tag:
api.logout()
raise Exception("Failed to remove vlans")
api.logout()

88
brassage/switch_api.py Normal file
View file

@ -0,0 +1,88 @@
import json
import os
import urllib
import requests
class SwitchApi:
def __init__(self, url, api="v1"):
self.headers = {"Content-Type": "application/json"}
self.url_base = urllib.parse.urljoin(url, f"/rest/{api}")
self.proxies = None
http_proxy = os.getenv("HTTP_PROXY")
all_proxy = os.getenv("ALL_PROXY")
if http_proxy != "":
self.proxies = {"http": http_proxy}
elif all_proxy != "":
self.proxies = {"http": all_proxy}
def login(self, username, password, timeout=2):
"""
Log in to the rest api.
Return True if the connection has succeeded and False otherwise.
"""
data = {"userName": username, "password": password}
response = self.post(
"/login-sessions", data=json.dumps(data), timeout=timeout
)
if response.status_code != 201:
return False
data = response.json()
if "cookie" not in data:
return False
self.headers["cookie"] = data["cookie"]
return True
def logout(self, timeout=2):
"""
Log out of the rest api.
Return True if connection has succeeded and False otherwise.
"""
response = self.delete("/login-sessions", timeout=timeout)
if response.status_code != 204:
return False
self.headers.pop("cookie")
return True
def post(self, url, data=None, timeout=2):
kwargs = {"headers": self.headers}
if data is not None:
kwargs["data"] = data
if self.proxies is not None:
kwargs["proxies"] = self.proxies
return requests.post(self.url_base + url, timeout=timeout, **kwargs)
def get(self, url, data="", timeout=2):
kwargs = {"headers": self.headers}
if data is not None:
kwargs["data"] = data
if self.proxies is not None:
kwargs["proxies"] = self.proxies
return requests.get(self.url_base + url, timeout=timeout, **kwargs)
def delete(self, url, data="", timeout=2):
kwargs = {"headers": self.headers}
if data is not None:
kwargs["data"] = data
if self.proxies is not None:
kwargs["proxies"] = self.proxies
return requests.delete(self.url_base + url, timeout=timeout, **kwargs)
def put(self, url, data="", timeout=2):
kwargs = {"headers": self.headers}
if data is not None:
kwargs["data"] = data
if self.proxies is not None:
kwargs["proxies"] = self.proxies
return requests.put(self.url_base + url, timeout=timeout, **kwargs)

View file

@ -2,12 +2,11 @@ from jinja2 import Environment, PackageLoader, select_autoescape
from starlette.applications import Starlette
from starlette.exceptions import HTTPException
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Mount, Route
from starlette.staticfiles import StaticFiles
from starlette.templating import Jinja2Templates
from brassage.config import Config, State, read_config
from brassage.config import Config, State
from brassage.switch import Interface
templates = Jinja2Templates(
@ -18,7 +17,7 @@ templates = Jinja2Templates(
)
def state_str(state: State | None, states: dict[str, State]) -> str:
def state_str(state: State | None, targets: dict[str, State]) -> str:
def extract():
if state.untag is not None:
yield f"{state.untag}"
@ -27,7 +26,8 @@ def state_str(state: State | None, states: dict[str, State]) -> str:
if state is None:
return "Inconnu"
for name, target in states.items():
for name, target in targets.items():
if state == target:
return name
return f"Invalide ({', '.join(extract())})"

View file

@ -20,6 +20,7 @@ classifiers = [
"Programming Language :: Python :: 3.12",
]
dependencies = [
"requests[socks]",
"starlette",
"aiohttp",
"pydantic>=2",
@ -38,16 +39,30 @@ target-version = "py312"
[tool.ruff.lint]
select = ["ALL"]
ignore = [
"ANN001",
"ANN201",
"ANN202",
"ANN204",
"ASYNC109",
"C901",
"COM812",
"D100",
"D101",
"D102",
"D103",
"D104",
"T201",
"D203",
"D212",
"COM812",
"E722",
"EM101",
"EM102",
"FBT001",
"ISC001",
"FIX",
"ISC001",
"PLR0912",
"PLR2004",
"T201",
"TD",
"TRY002",
"TRY003",
]