Compare commits
1 commit
Author | SHA1 | Date | |
---|---|---|---|
![]() |
25dc82c5c6 |
6 changed files with 244 additions and 11 deletions
1
.gitignore
vendored
Normal file
1
.gitignore
vendored
Normal file
|
@ -0,0 +1 @@
|
|||
__pycache__
|
|
@ -12,7 +12,7 @@ class Switch(BaseModel):
|
|||
|
||||
class State(BaseModel):
|
||||
untag: int | None = None
|
||||
tag: set[int] = []
|
||||
tag: set[int] = set()
|
||||
|
||||
|
||||
class Room(BaseModel):
|
||||
|
|
|
@ -1,11 +1,140 @@
|
|||
import json
|
||||
|
||||
from brassage.config import State, Switch
|
||||
from brassage.switch_api import SwitchApi
|
||||
|
||||
|
||||
def state2dict(state: State):
|
||||
state_dict = {}
|
||||
if state.untag is not None:
|
||||
state_dict[state.untag] = "POM_UNTAGGED"
|
||||
for vlan in state.tag:
|
||||
state_dict[vlan] = "POM_TAGGED_STATIC"
|
||||
return state_dict
|
||||
|
||||
|
||||
class Interface:
|
||||
def __init__(self, switch: Switch) -> None:
|
||||
self._switch = switch
|
||||
|
||||
async def fetch(self, port: str, timeout: int = 2) -> State | None:
|
||||
return State(untag=102, tag=[120, 404, 505])
|
||||
def _config_vlans_ports(self, port: str, api: SwitchApi, timeout: int = 2):
|
||||
response = api.get("/vlans-ports", timeout=timeout)
|
||||
|
||||
async def update(self, port: str, state: State) -> None: ...
|
||||
if response.status_code <= 204:
|
||||
data = response.json()
|
||||
if (
|
||||
data["collection_result"]["total_elements_count"]
|
||||
!= data["collection_result"]["filtered_elements_count"]
|
||||
):
|
||||
raise Exception("Impossible to get all tuple (vlan, port).")
|
||||
|
||||
vlans = [
|
||||
s for s in data["vlan_port_element"] if s["port_id"] == port
|
||||
]
|
||||
|
||||
tag = []
|
||||
untag = []
|
||||
for s in vlans:
|
||||
if s["port_mode"] == "POM_TAGGED_STATIC":
|
||||
tag.append(s["vlan_id"])
|
||||
else:
|
||||
untag.append(s["vlan_id"])
|
||||
|
||||
if len(untag) > 2:
|
||||
raise ValueError(
|
||||
f"The number of untagged vlans must be 0 or 1 (not {len(untag)})."
|
||||
)
|
||||
untag = None if len(untag) == 0 else untag[0]
|
||||
|
||||
return State(untag=untag, tag=tag)
|
||||
raise Exception(
|
||||
f"Impossible to get configuration ({response.status_code})"
|
||||
)
|
||||
|
||||
async def fetch(self, port: str, timeout: int = 2) -> State | None:
|
||||
api = SwitchApi(self._switch.url)
|
||||
api.login(self._switch.user, self._switch.password)
|
||||
try:
|
||||
vlans = self._config_vlans_ports(port, api, timeout)
|
||||
finally:
|
||||
api.logout()
|
||||
return vlans
|
||||
|
||||
async def update(self, port: str, state: State, timeout: int = 2) -> None:
|
||||
api = SwitchApi(self._switch.url)
|
||||
success = api.login(self._switch.user, self._switch.password)
|
||||
if not success:
|
||||
raise Exception("Login failed.")
|
||||
|
||||
# 1. Get vlans
|
||||
try:
|
||||
vlans = self._config_vlans_ports(port, api, timeout=timeout)
|
||||
except:
|
||||
api.logout()
|
||||
raise Exception("Failed to check vlans")
|
||||
|
||||
# 2. Add vlans
|
||||
vlans_dict = state2dict(vlans)
|
||||
state_dict = state2dict(state)
|
||||
|
||||
for vlan in state_dict:
|
||||
data = {
|
||||
"vlan_id": vlan,
|
||||
"port_id": port,
|
||||
"port_mode": state_dict[vlan],
|
||||
}
|
||||
if vlan not in vlans_dict:
|
||||
try:
|
||||
api.post(
|
||||
"/vlans-ports",
|
||||
data=json.dumps(data),
|
||||
timeout=timeout,
|
||||
)
|
||||
except:
|
||||
api.logout()
|
||||
raise Exception("Failed to set vlans up")
|
||||
elif vlans_dict[vlan] != state_dict[vlan]:
|
||||
try:
|
||||
api.put(
|
||||
f"/vlans-ports/{vlan}-{port}",
|
||||
data=json.dumps(data),
|
||||
timeout=timeout,
|
||||
)
|
||||
except:
|
||||
api.logout()
|
||||
raise Exception("Failed to set vlans up")
|
||||
|
||||
# 3. Check vlans
|
||||
try:
|
||||
vlans = self._config_vlans_ports(port, api, timeout=timeout)
|
||||
except:
|
||||
api.logout()
|
||||
|
||||
if int(vlans.untag) != int(state.untag) or not vlans.tag.issubset(
|
||||
state.tag
|
||||
):
|
||||
api.logout()
|
||||
raise Exception("Failed to add vlans")
|
||||
|
||||
# 4. Remove vlans
|
||||
vlans_dict = state2dict(vlans)
|
||||
state_dict = state2dict(state)
|
||||
|
||||
for vlan in vlans_dict:
|
||||
if vlan not in state_dict:
|
||||
try:
|
||||
api.delete(f"/vlans-ports/{vlan}-{port}", timeout=timeout)
|
||||
except:
|
||||
api.logout(timeout=timeout)
|
||||
|
||||
# 5. Check vlans
|
||||
try:
|
||||
vlans = self._config_vlans_ports(port, api, timeout=timeout)
|
||||
except:
|
||||
api.logout()
|
||||
raise Exception("Failed to check vlans")
|
||||
|
||||
if vlans.untag != state.untag or vlans.tag != state.tag:
|
||||
api.logout()
|
||||
raise Exception("Failed to remove vlans")
|
||||
api.logout()
|
||||
|
|
88
brassage/switch_api.py
Normal file
88
brassage/switch_api.py
Normal file
|
@ -0,0 +1,88 @@
|
|||
import json
|
||||
import os
|
||||
import urllib
|
||||
|
||||
import requests
|
||||
|
||||
|
||||
class SwitchApi:
|
||||
def __init__(self, url, api="v1"):
|
||||
self.headers = {"Content-Type": "application/json"}
|
||||
self.url_base = urllib.parse.urljoin(url, f"/rest/{api}")
|
||||
self.proxies = None
|
||||
|
||||
http_proxy = os.getenv("HTTP_PROXY")
|
||||
all_proxy = os.getenv("ALL_PROXY")
|
||||
if http_proxy != "":
|
||||
self.proxies = {"http": http_proxy}
|
||||
elif all_proxy != "":
|
||||
self.proxies = {"http": all_proxy}
|
||||
|
||||
def login(self, username, password, timeout=2):
|
||||
"""
|
||||
Log in to the rest api.
|
||||
|
||||
Return True if the connection has succeeded and False otherwise.
|
||||
"""
|
||||
data = {"userName": username, "password": password}
|
||||
response = self.post(
|
||||
"/login-sessions", data=json.dumps(data), timeout=timeout
|
||||
)
|
||||
|
||||
if response.status_code != 201:
|
||||
return False
|
||||
|
||||
data = response.json()
|
||||
if "cookie" not in data:
|
||||
return False
|
||||
|
||||
self.headers["cookie"] = data["cookie"]
|
||||
return True
|
||||
|
||||
def logout(self, timeout=2):
|
||||
"""
|
||||
Log out of the rest api.
|
||||
|
||||
Return True if connection has succeeded and False otherwise.
|
||||
"""
|
||||
response = self.delete("/login-sessions", timeout=timeout)
|
||||
if response.status_code != 204:
|
||||
return False
|
||||
self.headers.pop("cookie")
|
||||
return True
|
||||
|
||||
def post(self, url, data=None, timeout=2):
|
||||
kwargs = {"headers": self.headers}
|
||||
if data is not None:
|
||||
kwargs["data"] = data
|
||||
if self.proxies is not None:
|
||||
kwargs["proxies"] = self.proxies
|
||||
|
||||
return requests.post(self.url_base + url, timeout=timeout, **kwargs)
|
||||
|
||||
def get(self, url, data="", timeout=2):
|
||||
kwargs = {"headers": self.headers}
|
||||
if data is not None:
|
||||
kwargs["data"] = data
|
||||
if self.proxies is not None:
|
||||
kwargs["proxies"] = self.proxies
|
||||
|
||||
return requests.get(self.url_base + url, timeout=timeout, **kwargs)
|
||||
|
||||
def delete(self, url, data="", timeout=2):
|
||||
kwargs = {"headers": self.headers}
|
||||
if data is not None:
|
||||
kwargs["data"] = data
|
||||
if self.proxies is not None:
|
||||
kwargs["proxies"] = self.proxies
|
||||
|
||||
return requests.delete(self.url_base + url, timeout=timeout, **kwargs)
|
||||
|
||||
def put(self, url, data="", timeout=2):
|
||||
kwargs = {"headers": self.headers}
|
||||
if data is not None:
|
||||
kwargs["data"] = data
|
||||
if self.proxies is not None:
|
||||
kwargs["proxies"] = self.proxies
|
||||
|
||||
return requests.put(self.url_base + url, timeout=timeout, **kwargs)
|
|
@ -2,12 +2,11 @@ from jinja2 import Environment, PackageLoader, select_autoescape
|
|||
from starlette.applications import Starlette
|
||||
from starlette.exceptions import HTTPException
|
||||
from starlette.requests import Request
|
||||
from starlette.responses import JSONResponse
|
||||
from starlette.routing import Mount, Route
|
||||
from starlette.staticfiles import StaticFiles
|
||||
from starlette.templating import Jinja2Templates
|
||||
|
||||
from brassage.config import Config, State, read_config
|
||||
from brassage.config import Config, State
|
||||
from brassage.switch import Interface
|
||||
|
||||
templates = Jinja2Templates(
|
||||
|
@ -18,7 +17,7 @@ templates = Jinja2Templates(
|
|||
)
|
||||
|
||||
|
||||
def state_str(state: State | None, states: dict[str, State]) -> str:
|
||||
def state_str(state: State | None, targets: dict[str, State]) -> str:
|
||||
def extract():
|
||||
if state.untag is not None:
|
||||
yield f"{state.untag}"
|
||||
|
@ -27,7 +26,8 @@ def state_str(state: State | None, states: dict[str, State]) -> str:
|
|||
|
||||
if state is None:
|
||||
return "Inconnu"
|
||||
for name, target in states.items():
|
||||
|
||||
for name, target in targets.items():
|
||||
if state == target:
|
||||
return name
|
||||
return f"Invalide ({', '.join(extract())})"
|
||||
|
|
|
@ -20,6 +20,7 @@ classifiers = [
|
|||
"Programming Language :: Python :: 3.12",
|
||||
]
|
||||
dependencies = [
|
||||
"requests[socks]",
|
||||
"starlette",
|
||||
"aiohttp",
|
||||
"pydantic>=2",
|
||||
|
@ -38,16 +39,30 @@ target-version = "py312"
|
|||
[tool.ruff.lint]
|
||||
select = ["ALL"]
|
||||
ignore = [
|
||||
"ANN001",
|
||||
"ANN201",
|
||||
"ANN202",
|
||||
"ANN204",
|
||||
"ASYNC109",
|
||||
"C901",
|
||||
"COM812",
|
||||
"D100",
|
||||
"D101",
|
||||
"D102",
|
||||
"D103",
|
||||
"D104",
|
||||
"T201",
|
||||
"D203",
|
||||
"D212",
|
||||
"COM812",
|
||||
"E722",
|
||||
"EM101",
|
||||
"EM102",
|
||||
"FBT001",
|
||||
"ISC001",
|
||||
"FIX",
|
||||
"ISC001",
|
||||
"PLR0912",
|
||||
"PLR2004",
|
||||
"T201",
|
||||
"TD",
|
||||
"TRY002",
|
||||
"TRY003",
|
||||
]
|
||||
|
|
Loading…
Reference in a new issue