Provisioning-switchs/generate-conf.py
Michaël Paulon 058451c41d clean
2020-09-30 00:01:52 +02:00

413 lines
18 KiB
Python

import argparse
import difflib
import getpass
import logging
import colorlog
import requests
import termcolor
import yaml
from jinja2 import Template
from core import connect_to_switch, sftp_read_file, sftp_write_file
logger = logging.getLogger()
def syntax_from_range(value, vlan_syntax=False):
if value == []:
return None
value = set(value)
prec = None
syntax = ""
in_range = False
for i in value:
if prec is None or prec != i - 1:
if syntax == "":
syntax = str(i)
elif in_range:
if vlan_syntax:
syntax += "-{} {}".format(prec,i)
else:
syntax += "-{},{}".format(prec,i)
in_range = False
else:
if vlan_syntax:
syntax += " {}".format(i)
else:
syntax += ",{}".format(i)
in_range = False
elif i == max(value):
syntax += "-{}".format(i)
else:
in_range = True
prec = i
if vlan_syntax:
syntax += " "
return syntax
def range_from_syntax(value, vlan_syntax=False):
if value is None:
return []
ret_range = list()
if vlan_syntax:
value = str(value).split(" ")
else:
value = str(value).split(",")
for v in value:
v = v.split("-")
if len(v) == 2:
v = list(range(int(v[0]), int(v[1])+1))
elif len(v) == 1:
v = [int(v[0])]
else:
raise RuntimeError("You fucked")
ret_range += v
return ret_range
def gen_vlan(vlan_number, switch_config, dhcp_snooping_vlans):
vlan_config = yaml.load(open("configs/vlans/{}.yml".format(vlan_number), "r"), yaml.Loader)
vlan = {
"name": vlan_config.get("name"),
"tagged": list(),
"untagged": list(),
}
if vlan_config.get("dhcp_snooping", True):
dhcp_snooping_vlans.append(int(vlan_number))
if vlan_config.get("has_ip"):
vlan["ip"] = dict()
v4 = switch_config.get("ipv4-addr")
v4_sub = switch_config.get("ipv4-subnet")
if v4 is not None and v4_sub is not None:
logger.debug("Adding {} {} to vlan {}".format(v4, v4_sub, vlan_number))
vlan["ip"]["addr"] = v4
vlan["ip"]["subnet"] = v4_sub
v6 = switch_config.get("ipv6-addr")
v6_sub = switch_config.get("ipv6-subnet")
if v6 is not None and v6_sub is not None:
logger.debug("Adding {}/{} to vlan {}".format(v6, v6_sub, vlan_number))
vlan["ip"]["addr6"] = v6
vlan["ip"]["subnet6"] = v6_sub
if vlan["ip"] == {}:
logger.critical("Missing IPs or subnets configurations for vlan {}".format(vlan_number))
raise RuntimeError()
return vlan, dhcp_snooping_vlans
def gen_interfaces(switch_config):
dhcp_snooping_vlans = list()
mac_based_ports = list()
ra_guard_ports = list()
ifaces_done = list()
interfaces = list()
vlans = dict()
has_ip = False
for iface, if_config in switch_config.get("interfaces").items():
if_profile = yaml.load(open("configs/profiles/{}.yml".format(if_config.get("profile"))), yaml.Loader)
iface = range_from_syntax(iface)
for iface_number in iface:
if not if_profile.get("ignore_port", False):
interface = {
"number": iface_number,
}
generic = if_profile.get("generic_name", None)
if generic is None:
interface["name"] = if_config["name"]
else:
interface["name"] = "TODO {} {}".format(generic, iface_number)
interface["mac_based"] = if_profile.get("mac_based", False)
if interface["mac_based"]:
mac_based_ports.append(iface_number)
if if_profile.get("ra_guard", True):
ra_guard_ports.append(iface_number)
interface["addr_limit"] = if_profile.get("addr_limit", 5)
interface["logoff"] = if_profile.get("logoff", 3600)
interfaces.append(interface)
ifaces_done.append(iface_number)
for vlan in range_from_syntax(if_profile.get("vlans").get("untagged")):
if vlans.get(vlan) is None:
vlans[vlan], dhcp_snooping_vlans = gen_vlan(vlan, switch_config, dhcp_snooping_vlans)
vlans[vlan]["untagged"].append(iface_number)
for vlan in range_from_syntax(if_profile.get("vlans").get("tagged")):
if vlans.get(vlan) is None:
vlans[vlan], dhcp_snooping_vlans = gen_vlan(vlan, switch_config, dhcp_snooping_vlans)
vlans[vlan]["tagged"].append(iface_number)
ifaces_done = set(ifaces_done)
# on reformatte les interfaces des vlans
for vlan, v_conf in vlans.items():
vlans[vlan]["tagged"] = syntax_from_range(v_conf["tagged"])
vlans[vlan]["untagged"] = syntax_from_range(v_conf["untagged"])
if max(ifaces_done) != switch_config.get("nb_ports") or len(ifaces_done) != switch_config.get("nb_ports"):
raise RuntimeError("Interfaces fucked")
interfaces.sort(key=lambda x: x["number"])
return interfaces, vlans, mac_based_ports, ra_guard_ports, dhcp_snooping_vlans
def get_header(old_config):
header = "\n".join(old_config.split("\n")[:2])
return header
def conf_from_dict(config_dict):
with open("configs/config.j2", "r") as template_file:
template = Template(template_file.read())
configuration = template.render(config_dict)
return configuration
def gen_conf(master_config, switch_config, header):
interfaces, vlans, mac_based_ports, ra_guard_ports, dhcp_snooping_vlans = gen_interfaces(switch_config)
config_dict = {
"header": header,
"hostname": switch_config.get("hostname"),
"dhcp_servers": master_config.get("dhcp_servers"),
"dhcpv6_servers": master_config.get("dhcpv6_servers"),
"snmp_user": master_config.get("snmp_user"),
"interfaces": interfaces,
"vlans": vlans,
"sntp": master_config.get("sntp_servers"),
"ipv4_managers": master_config.get("managers_v4", {}),
"ipv6_managers": master_config.get("managers_v6", {}),
"dns": master_config.get("dns"),
"location": switch_config["location"],
"radius_servers": master_config.get("radius_servers"),
"mac_based_ports": syntax_from_range(mac_based_ports),
"ra_guard_ports": syntax_from_range(ra_guard_ports),
"unauth_redirect": master_config.get("unauth_redirect"),
"dhcp_snooping_vlans": syntax_from_range(dhcp_snooping_vlans, vlan_syntax=True),
}
return conf_from_dict(config_dict)
def gen_conf_re2o(re2o_config, header):
mgmt_utils = re2o_config.get("switchs_management_utils")
ipv4_managers = dict()
for m in mgmt_utils.get("subnet"):
ipv4_managers[m.get("network")] = { "ip": m.get("network"), "subnet": m.get("netmask")}
ipv6_managers = dict()
# FUCK YOU ! subnet6 c'est pas une liste de subnets mais un seul subnet
m = mgmt_utils.get("subnet6")
ipv6_managers[m.get("network")] = { "ip": m.get("network"), "subnet": m.get("netmask")}
vlans = dict()
dhcp_snooping_vlans = list()
dhcpv6_snooping_vlans = list()
arp_protect = { "vlans": list() }
for vlan in re2o_config.get("vlans"):
vlan_id = vlan["vlan_id"]
vlans[vlan_id] = vlan
range_tagged = list()
range_untagged = list()
# on récupère les ports tag et untagged
for port in re2o_config.get("ports"):
port_profile = port["get_port_profile"]
for v in port_profile["vlan_tagged"]:
if v["vlan_id"] == vlan_id:
range_tagged.append(port["port"])
# i n'y a qu'un seul vlan untagged
v = port_profile["vlan_untagged"]
if v is not None and v["vlan_id"] == vlan_id:
range_untagged.append(port["port"])
vlans[vlan_id]["tagged"] = syntax_from_range(range_tagged)
vlans[vlan_id]["untagged"] = syntax_from_range(range_untagged)
# on rajoute les ips sur les vlans où il y en a
for address, iface in re2o_config.get("interfaces_subnet", dict()).items():
# ouais y'a une autre liste là, don't ask
for i in iface:
if i["vlan_id"] == vlan_id:
if vlans[vlan_id].get("ip") is None:
vlans[vlan_id]["ip"] = dict()
vlans[vlan_id]["ip"]["addr"] = address
vlans[vlan_id]["ip"]["subnet"] = i["netmask"]
for address, iface in re2o_config.get("interfaces6_subnet", dict()).items():
if iface["vlan_id"] == vlan_id:
if vlans[vlan_id].get("ip") is None:
vlans[vlan_id]["ip"] = dict()
vlans[vlan_id]["ip"]["addr6"] = address
vlans[vlan_id]["ip"]["subnet6"] = iface["netmask_cidr"]
# on ajoute les vlans qui ont besoin du dhcp snooping
if vlan["dhcp_snooping"]:
dhcp_snooping_vlans.append(vlan_id)
if vlan["dhcpv6_snooping"]:
dhcpv6_snooping_vlans.append(vlan_id)
if vlan["arp_protect"]:
arp_protect["vlans"].append(vlan_id)
vlans[vlan_id]["ipv6_mld"] = vlan["mld"]
# on récupère les informations intéressantes sur les ports
mac_based_ports = list()
ra_guard_ports = list()
interfaces = list()
loop_protect = { "ports": list()}
for port in re2o_config.get("ports"):
port_profile = port["get_port_profile"]
if port_profile["ra_guard"]:
ra_guard_ports.append(port["port"])
if port_profile["loop_protect"]:
loop_protect["ports"].append(port["port"])
iface = {
"name": port["pretty_name"],
"number": port["port"],
"dhcp_trust": not port_profile["dhcp_snooping"],
"dhcpv6_trust": not port_profile["dhcpv6_snooping"],
"flowcontrol": port_profile["flow_control"],
"arp_trust": not port_profile["arp_protect"],
}
if port_profile["radius_type"] == "MAC-radius":
mac_based_ports.append(port["port"])
iface["mac_based"] = True
iface["addr_limit"] = port_profile["mac_limit"]
iface["logoff"] = master_config.get("radius_logoff")
interfaces.append(iface)
loop_protect["ports"] = syntax_from_range(loop_protect["ports"])
arp_protect["vlans"] = syntax_from_range(arp_protect["vlans"], vlan_syntax=True)
interfaces.sort(key=lambda x: x["number"])
radius_key = re2o_config.get("get_radius_key_value")
radius_servers = [ {"ip": i, "secret": radius_key } for i in mgmt_utils["radius_servers"]["ipv4"] + mgmt_utils["radius_servers"]["ipv6"]]
config_dict = {
"header": header,
"location": re2o_config.get("switchbay").get("name"),
"hostname": re2o_config.get("short_name"),
"dhcp_servers": mgmt_utils.get("dhcp_servers").get("ipv4"),
"dhcpv6_servers": mgmt_utils.get("dhcp_servers").get("ipv6"),
"ipv4_managers": ipv4_managers,
"ipv6_managers": ipv6_managers,
"vlans": vlans,
"dhcp_snooping_vlans": syntax_from_range(dhcp_snooping_vlans, vlan_syntax=True),
"dhcpv6_snooping_vlans": syntax_from_range(dhcpv6_snooping_vlans, vlan_syntax=True),
"logging": mgmt_utils["log_servers"]["ipv4"] + mgmt_utils["log_servers"]["ipv6"],
"sntp": mgmt_utils["ntp_servers"]["ipv4"] + mgmt_utils["ntp_servers"]["ipv6"],
"dns": mgmt_utils["dns_recursive_servers"]["ipv4"] + mgmt_utils["dns_recursive_servers"]["ipv6"],
"radius_servers": radius_servers,
"snmp_user": master_config.get("snmp_user"),
"unauth_redirect": master_config.get("unauth_redirect"),
"mac_based_ports": syntax_from_range(mac_based_ports),
"ra_guard_ports": syntax_from_range(ra_guard_ports),
"loop_protect": loop_protect,
"arp_protect": arp_protect,
"interfaces": interfaces,
}
return conf_from_dict(config_dict)
def connect_as_self(base_url, username=None):
"""Get a re2o token for the current unix user """
if username is None:
username = getpass.getuser()
r = requests.post(
base_url + "token-auth",
data={
"username": username,
"password": getpass.getpass("Login to {} as {} with password :".format(base_url, username))
}
)
if r.status_code != 200:
logger.critical("Wrong login/password")
exit(1)
token = r.json().get("token")
return token
def get_switch_from_results(results, switch_name):
for s in results:
if s.get("short_name") == switch_name:
return s
return None
def get_switch_from_re2o(re2o_instance, switch_name, re2o_user):
base_url = "{}/api/".format(re2o_instance)
token = connect_as_self(base_url, re2o_user)
headers = {"Authorization": "Token " + token}
# On récupère la config du bon switch
r = requests.get(base_url + "switchs/ports-config", headers=headers)
sw_config = get_switch_from_results(r.json()["results"], switch_name)
while r.json().get("next") and s is None:
r = requests.get(r.json().get("next"), headers=headers)
sw_config = get_switch_from_results(r.json()["results"], switch_name)
# on récupère les infos globales pour dhcp, managers etc.
r = requests.get(base_url + "preferences/optionaltopologie", headers=headers)
sw_config.update(r.json())
# on récupère la liste des vlans
r = requests.get(base_url + "machines/vlan", headers=headers)
sw_config.update({"vlans": r.json().get("results")})
return sw_config
if __name__ == "__main__":
format_string = "%(asctime)s - %(levelname)s - %(message)s"
stdout_handler = logging.StreamHandler()
stdout_formatter = colorlog.ColoredFormatter("%(log_color)s{}".format(format_string))
stdout_handler.setFormatter(stdout_formatter)
logger.addHandler(stdout_handler)
logger.setLevel(logging.DEBUG)
parser = argparse.ArgumentParser(description="Script de déploiement de la configuration des switchs")
parser.add_argument("-d", "--dry-run", action="store_true", help="Génère la nouvelle configuration et affiche le diff \
sans tenter de l'appliquer")
parser.add_argument("-w", "--whole", action="store_true", help="Affiche la configuration en entier au lieu du diff")
parser.add_argument("switch_name", type=str, help="Génère le template de ce switch")
parser.add_argument("-H", "--host", type=str, required=False, help="Host sur lequel de déployer la configuration au lieu de l'adresse dans le template")
parser.add_argument("-r", "--re2o", type=str, required=False, help="Si renseigné, la configuration sera récupérée depuis l'instance de re2o indiquée au lieu d'utiliser les fichiers yaml")
parser.add_argument("--re2o-user", type=str, required=False, help="Permet de choisir l'user pour se connecter à l'api re2o, par défaut on prend l'user unix courant")
parser.add_argument("-4", "--force-ipv4", action="store_true", help="Force le provisionning en ipv4")
args = parser.parse_args()
if args.re2o:
logger.debug("Loading master config")
master_config = yaml.load(open("configs/master.yml", "r"), yaml.Loader)
logger.debug("Loading config from re2o")
re2o_config = get_switch_from_re2o(args.re2o, args.switch_name, args.re2o_user)
else:
logger.debug("Loading master config")
master_config = yaml.load(open("configs/master.yml", "r"), yaml.Loader)
logger.debug("Loading config for {}".format(args.switch_name))
switch_config = yaml.load(open("configs/switches/{}.yml".format(args.switch_name), "r"), yaml.Loader)
if args.host:
switch_address = args.host
elif args.re2o:
if args.force_ipv4:
switch_address = re2o_config.get("ipv4")
else:
switch_address = re2o_config.get("ipv6") or re2o_config.get("ipv4")
else:
if args.force_ipv4:
switch_address = switch_config.get("ipv4-addr")
else:
switch_address = switch_config.get("ipv6-addr") or switch_config.get("ipv4-addr")
if switch_address is None:
switch_address = "{}.switches.crans.org".format(args.switch_name) #TODO: crans
logger.info("Connecting to {} with address {}".format(args.switch_name, switch_address))
session = connect_to_switch(switch_address, user="root", key=master_config.get("ssh_private_key")) #TODO: spécifier chemin clef
old_config = sftp_read_file(session, "cfg/running-config").decode("utf-8")
header = get_header(old_config)
# génération de la conf
logging.info("Generating configuration for {}".format(args.switch_name))
if args.re2o:
configuration = gen_conf_re2o(re2o_config, header)
else:
configuration = gen_conf(master_config, switch_config, header)
# génération du diff
for line in difflib.unified_diff(old_config.split("\n"), configuration.split("\n"), fromfile='origin', tofile='new', lineterm=""):
if line.startswith("-"):
termcolor.cprint(line, "red")
elif line.startswith("+"):
termcolor.cprint(line, "green")
elif line.startswith("@"):
termcolor.cprint(line, "yellow")
else:
print(line)
if args.dry_run or input("Voulez-vous déployer la configuration sur le switch ? y/[n]").lower() not in ["o","y"]:
logger.info("Aborting deployement")
exit(0)
with open("/tmp/conf_tmp", "w") as conf_tmp:
conf_tmp.write(configuration)
del session
logging.info("Uploading configuration for {}".format(args.switch_name))
session = connect_to_switch(switch_address, user="crans", key=master_config.get("ssh_private_key"))
sftp_write_file(session, "/tmp/conf_tmp", "cfg/startup-config")
logging.info("Deployement done for {}".format(args.switch_name))
# on retire ces deux lignes qui sont hardcodées dans les templates crans
# c'est deux adresses de multicast au pif, wtf ?
# https://gitlab.crans.org/nounous/scripts/-/commit/af2d158ff1a99dee2f032a81bf14de4144d2b82f
#-filter multicast 01005e-0000fb drop 1-52
#-filter multicast 333300-0000fb drop 1-52