diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..bee8a64 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +__pycache__ diff --git a/brassage/config.py b/brassage/config.py index f054989..f1d1335 100644 --- a/brassage/config.py +++ b/brassage/config.py @@ -12,7 +12,7 @@ class Switch(BaseModel): class State(BaseModel): untag: int | None = None - tag: set[int] = [] + tag: set[int] = set() class Room(BaseModel): diff --git a/brassage/switch.py b/brassage/switch.py index 4216999..bd455d6 100644 --- a/brassage/switch.py +++ b/brassage/switch.py @@ -1,11 +1,140 @@ +import json + from brassage.config import State, Switch +from brassage.switch_api import SwitchApi + + +def state2dict(state: State): + state_dict = {} + if state.untag is not None: + state_dict[state.untag] = "POM_UNTAGGED" + for vlan in state.tag: + state_dict[vlan] = "POM_TAGGED_STATIC" + return state_dict class Interface: def __init__(self, switch: Switch) -> None: self._switch = switch - async def fetch(self, port: str, timeout: int = 2) -> State | None: - return State(untag=102, tag=[120, 404, 505]) + def _config_vlans_ports(self, port: str, api: SwitchApi, timeout: int = 2): + response = api.get("/vlans-ports", timeout=timeout) - async def update(self, port: str, state: State) -> None: ... + if response.status_code <= 204: + data = response.json() + if ( + data["collection_result"]["total_elements_count"] + != data["collection_result"]["filtered_elements_count"] + ): + raise Exception("Impossible to get all tuple (vlan, port).") + + vlans = [ + s for s in data["vlan_port_element"] if s["port_id"] == port + ] + + tag = [] + untag = [] + for s in vlans: + if s["port_mode"] == "POM_TAGGED_STATIC": + tag.append(s["vlan_id"]) + else: + untag.append(s["vlan_id"]) + + if len(untag) > 2: + raise ValueError( + f"The number of untagged vlans must be 0 or 1 (not {len(untag)})." + ) + untag = None if len(untag) == 0 else untag[0] + + return State(untag=untag, tag=tag) + raise Exception( + f"Impossible to get configuration ({response.status_code})" + ) + + async def fetch(self, port: str, timeout: int = 2) -> State | None: + api = SwitchApi(self._switch.url) + api.login(self._switch.user, self._switch.password) + try: + vlans = self._config_vlans_ports(port, api, timeout) + finally: + api.logout() + return vlans + + async def update(self, port: str, state: State, timeout: int = 2) -> None: + api = SwitchApi(self._switch.url) + success = api.login(self._switch.user, self._switch.password) + if not success: + raise Exception("Login failed.") + + # 1. Get vlans + try: + vlans = self._config_vlans_ports(port, api, timeout=timeout) + except: + api.logout() + raise Exception("Failed to check vlans") + + # 2. Add vlans + vlans_dict = state2dict(vlans) + state_dict = state2dict(state) + + for vlan in state_dict: + data = { + "vlan_id": vlan, + "port_id": port, + "port_mode": state_dict[vlan], + } + if vlan not in vlans_dict: + try: + api.post( + "/vlans-ports", + data=json.dumps(data), + timeout=timeout, + ) + except: + api.logout() + raise Exception("Failed to set vlans up") + elif vlans_dict[vlan] != state_dict[vlan]: + try: + api.put( + f"/vlans-ports/{vlan}-{port}", + data=json.dumps(data), + timeout=timeout, + ) + except: + api.logout() + raise Exception("Failed to set vlans up") + + # 3. Check vlans + try: + vlans = self._config_vlans_ports(port, api, timeout=timeout) + except: + api.logout() + + if int(vlans.untag) != int(state.untag) or not vlans.tag.issubset( + state.tag + ): + api.logout() + raise Exception("Failed to add vlans") + + # 4. Remove vlans + vlans_dict = state2dict(vlans) + state_dict = state2dict(state) + + for vlan in vlans_dict: + if vlan not in state_dict: + try: + api.delete(f"/vlans-ports/{vlan}-{port}", timeout=timeout) + except: + api.logout(timeout=timeout) + + # 5. Check vlans + try: + vlans = self._config_vlans_ports(port, api, timeout=timeout) + except: + api.logout() + raise Exception("Failed to check vlans") + + if vlans.untag != state.untag or vlans.tag != state.tag: + api.logout() + raise Exception("Failed to remove vlans") + api.logout() diff --git a/brassage/switch_api.py b/brassage/switch_api.py new file mode 100644 index 0000000..149d206 --- /dev/null +++ b/brassage/switch_api.py @@ -0,0 +1,88 @@ +import json +import os +import urllib + +import requests + + +class SwitchApi: + def __init__(self, url, api="v1"): + self.headers = {"Content-Type": "application/json"} + self.url_base = urllib.parse.urljoin(url, f"/rest/{api}") + self.proxies = None + + http_proxy = os.getenv("HTTP_PROXY") + all_proxy = os.getenv("ALL_PROXY") + if http_proxy != "": + self.proxies = {"http": http_proxy} + elif all_proxy != "": + self.proxies = {"http": all_proxy} + + def login(self, username, password, timeout=2): + """ + Log in to the rest api. + + Return True if the connection has succeeded and False otherwise. + """ + data = {"userName": username, "password": password} + response = self.post( + "/login-sessions", data=json.dumps(data), timeout=timeout + ) + + if response.status_code != 201: + return False + + data = response.json() + if "cookie" not in data: + return False + + self.headers["cookie"] = data["cookie"] + return True + + def logout(self, timeout=2): + """ + Log out of the rest api. + + Return True if connection has succeeded and False otherwise. + """ + response = self.delete("/login-sessions", timeout=timeout) + if response.status_code != 204: + return False + self.headers.pop("cookie") + return True + + def post(self, url, data=None, timeout=2): + kwargs = {"headers": self.headers} + if data is not None: + kwargs["data"] = data + if self.proxies is not None: + kwargs["proxies"] = self.proxies + + return requests.post(self.url_base + url, timeout=timeout, **kwargs) + + def get(self, url, data="", timeout=2): + kwargs = {"headers": self.headers} + if data is not None: + kwargs["data"] = data + if self.proxies is not None: + kwargs["proxies"] = self.proxies + + return requests.get(self.url_base + url, timeout=timeout, **kwargs) + + def delete(self, url, data="", timeout=2): + kwargs = {"headers": self.headers} + if data is not None: + kwargs["data"] = data + if self.proxies is not None: + kwargs["proxies"] = self.proxies + + return requests.delete(self.url_base + url, timeout=timeout, **kwargs) + + def put(self, url, data="", timeout=2): + kwargs = {"headers": self.headers} + if data is not None: + kwargs["data"] = data + if self.proxies is not None: + kwargs["proxies"] = self.proxies + + return requests.put(self.url_base + url, timeout=timeout, **kwargs) diff --git a/brassage/web.py b/brassage/web.py index f1953ae..0fb5b0d 100644 --- a/brassage/web.py +++ b/brassage/web.py @@ -2,12 +2,11 @@ from jinja2 import Environment, PackageLoader, select_autoescape from starlette.applications import Starlette from starlette.exceptions import HTTPException from starlette.requests import Request -from starlette.responses import JSONResponse from starlette.routing import Mount, Route from starlette.staticfiles import StaticFiles from starlette.templating import Jinja2Templates -from brassage.config import Config, State, read_config +from brassage.config import Config, State from brassage.switch import Interface templates = Jinja2Templates( @@ -18,7 +17,7 @@ templates = Jinja2Templates( ) -def state_str(state: State | None, states: dict[str, State]) -> str: +def state_str(state: State | None, targets: dict[str, State]) -> str: def extract(): if state.untag is not None: yield f"{state.untag}" @@ -27,7 +26,8 @@ def state_str(state: State | None, states: dict[str, State]) -> str: if state is None: return "Inconnu" - for name, target in states.items(): + + for name, target in targets.items(): if state == target: return name return f"Invalide ({', '.join(extract())})" diff --git a/pyproject.toml b/pyproject.toml index cd14b61..4aa0cfc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,6 +20,7 @@ classifiers = [ "Programming Language :: Python :: 3.12", ] dependencies = [ + "requests[socks]", "starlette", "aiohttp", "pydantic>=2", @@ -38,16 +39,30 @@ target-version = "py312" [tool.ruff.lint] select = ["ALL"] ignore = [ + "ANN001", + "ANN201", + "ANN202", + "ANN204", + "ASYNC109", + "C901", + "COM812", "D100", "D101", + "D102", "D103", "D104", - "T201", "D203", "D212", - "COM812", + "E722", + "EM101", + "EM102", "FBT001", - "ISC001", "FIX", + "ISC001", + "PLR0912", + "PLR2004", + "T201", "TD", + "TRY002", + "TRY003", ]