Provisioning-switchs/generate-conf.py
2020-09-29 23:55:14 +02:00

423 lines
18 KiB
Python

import argparse
import difflib
import getpass
import logging
from pprint import pprint
import colorlog
import requests
import termcolor
import yaml
from jinja2 import Template
from core import connect_to_switch, sftp_read_file, sftp_write_file
logger = logging.getLogger()
def syntax_from_range(value, vlan_syntax=False):
if value == []:
return None
value = set(value)
prec = None
syntax = ""
in_range = False
for i in value:
if prec is None or prec != i - 1:
if syntax == "":
syntax = str(i)
elif in_range:
if vlan_syntax:
syntax += "-{} {}".format(prec,i)
else:
syntax += "-{},{}".format(prec,i)
in_range = False
else:
if vlan_syntax:
syntax += " {}".format(i)
else:
syntax += ",{}".format(i)
in_range = False
elif i == max(value):
syntax += "-{}".format(i)
else:
in_range = True
prec = i
if vlan_syntax:
syntax += " "
return syntax
def range_from_syntax(value, vlan_syntax=False):
if value is None:
return []
ret_range = list()
if vlan_syntax:
value = str(value).split(" ")
else:
value = str(value).split(",")
for v in value:
v = v.split("-")
if len(v) == 2:
v = list(range(int(v[0]), int(v[1])+1))
elif len(v) == 1:
v = [int(v[0])]
else:
raise RuntimeError("You fucked")
ret_range += v
return ret_range
def gen_vlan(vlan_number, switch_config, dhcp_snooping_vlans):
vlan_config = yaml.load(open("configs/vlans/{}.yml".format(vlan_number), "r"), yaml.Loader)
vlan = {
"name": vlan_config.get("name"),
"tagged": list(),
"untagged": list(),
}
if vlan_config.get("dhcp_snooping", True):
dhcp_snooping_vlans.append(int(vlan_number))
if vlan_config.get("has_ip"):
vlan["ip"] = dict()
v4 = switch_config.get("ipv4-addr")
v4_sub = switch_config.get("ipv4-subnet")
if v4 is not None and v4_sub is not None:
logger.debug("Adding {} {} to vlan {}".format(v4, v4_sub, vlan_number))
vlan["ip"]["addr"] = v4
vlan["ip"]["subnet"] = v4_sub
v6 = switch_config.get("ipv6-addr")
v6_sub = switch_config.get("ipv6-subnet")
if v6 is not None and v6_sub is not None:
logger.debug("Adding {}/{} to vlan {}".format(v6, v6_sub, vlan_number))
vlan["ip"]["addr6"] = v6
vlan["ip"]["subnet6"] = v6_sub
if vlan["ip"] == {}:
logger.critical("Missing IPs or subnets configurations for vlan {}".format(vlan_number))
raise RuntimeError()
return vlan, dhcp_snooping_vlans
def gen_interfaces(switch_config):
dhcp_snooping_vlans = list()
mac_based_ports = list()
ra_guard_ports = list()
ifaces_done = list()
interfaces = list()
vlans = dict()
has_ip = False
for iface, if_config in switch_config.get("interfaces").items():
if_profile = yaml.load(open("configs/profiles/{}.yml".format(if_config.get("profile"))), yaml.Loader)
iface = range_from_syntax(iface)
for iface_number in iface:
if not if_profile.get("ignore_port", False):
interface = {
"number": iface_number,
}
generic = if_profile.get("generic_name", None)
if generic is None:
interface["name"] = if_config["name"]
else:
interface["name"] = "TODO {} {}".format(generic, iface_number)
interface["mac_based"] = if_profile.get("mac_based", False)
if interface["mac_based"]:
mac_based_ports.append(iface_number)
if if_profile.get("ra_guard", True):
ra_guard_ports.append(iface_number)
interface["addr_limit"] = if_profile.get("addr_limit", 5)
interface["logoff"] = if_profile.get("logoff", 3600)
interfaces.append(interface)
ifaces_done.append(iface_number)
for vlan in range_from_syntax(if_profile.get("vlans").get("untagged")):
if vlans.get(vlan) is None:
vlans[vlan], dhcp_snooping_vlans = gen_vlan(vlan, switch_config, dhcp_snooping_vlans)
vlans[vlan]["untagged"].append(iface_number)
for vlan in range_from_syntax(if_profile.get("vlans").get("tagged")):
if vlans.get(vlan) is None:
vlans[vlan], dhcp_snooping_vlans = gen_vlan(vlan, switch_config, dhcp_snooping_vlans)
vlans[vlan]["tagged"].append(iface_number)
ifaces_done = set(ifaces_done)
# on reformatte les interfaces des vlans
for vlan, v_conf in vlans.items():
vlans[vlan]["tagged"] = syntax_from_range(v_conf["tagged"])
vlans[vlan]["untagged"] = syntax_from_range(v_conf["untagged"])
if max(ifaces_done) != switch_config.get("nb_ports") or len(ifaces_done) != switch_config.get("nb_ports"):
raise RuntimeError("Interfaces fucked")
interfaces.sort(key=lambda x: x["number"])
return interfaces, vlans, mac_based_ports, ra_guard_ports, dhcp_snooping_vlans
def get_header(old_config):
header = "\n".join(old_config.split("\n")[:2])
return header
def conf_from_dict(config_dict):
with open("configs/config.j2", "r") as template_file:
template = Template(template_file.read())
configuration = template.render(config_dict)
return configuration
def gen_conf(master_config, switch_config, header):
interfaces, vlans, mac_based_ports, ra_guard_ports, dhcp_snooping_vlans = gen_interfaces(switch_config)
config_dict = {
"header": header,
"hostname": switch_config.get("hostname"),
"dhcp_servers": master_config.get("dhcp_servers"),
"dhcpv6_servers": master_config.get("dhcpv6_servers"),
"snmp_user": master_config.get("snmp_user"),
"interfaces": interfaces,
"vlans": vlans,
"sntp": master_config.get("sntp_servers"),
"ipv4_managers": master_config.get("managers_v4", {}),
"ipv6_managers": master_config.get("managers_v6", {}),
"dns": master_config.get("dns"),
"location": switch_config["location"],
"radius_servers": master_config.get("radius_servers"),
"mac_based_ports": syntax_from_range(mac_based_ports),
"ra_guard_ports": syntax_from_range(ra_guard_ports),
"unauth_redirect": master_config.get("unauth_redirect"),
"dhcp_snooping_vlans": syntax_from_range(dhcp_snooping_vlans, vlan_syntax=True),
}
return conf_from_dict(config_dict)
def gen_conf_re2o(re2o_config, header):
mgmt_utils = re2o_config.get("switchs_management_utils")
ipv4_managers = dict()
for m in mgmt_utils.get("subnet"):
print(m)
ipv4_managers[m.get("network")] = { "ip": m.get("network"), "subnet": m.get("netmask")}
ipv6_managers = dict()
# FUCK YOU ! subnet6 c'est pas une liste de subnets mais un seul subnet
m = mgmt_utils.get("subnet6")
ipv6_managers[m.get("network")] = { "ip": m.get("network"), "subnet": m.get("netmask")}
vlans = dict()
dhcp_snooping_vlans = list()
dhcpv6_snooping_vlans = list()
arp_protect = { "vlans": list() }
for vlan in re2o_config.get("vlans"):
vlan_id = vlan["vlan_id"]
vlans[vlan_id] = vlan
range_tagged = list()
range_untagged = list()
# on récupère les ports tag et untagged
for port in re2o_config.get("ports"):
port_profile = port["get_port_profile"]
for v in port_profile["vlan_tagged"]:
print(v)
if v["vlan_id"] == vlan_id:
range_tagged.append(port["port"])
# i n'y a qu'un seul vlan untagged
v = port_profile["vlan_untagged"]
if v is not None and v["vlan_id"] == vlan_id:
range_untagged.append(port["port"])
vlans[vlan_id]["tagged"] = syntax_from_range(range_tagged)
vlans[vlan_id]["untagged"] = syntax_from_range(range_untagged)
# on rajoute les ips sur les vlans où il y en a
for address, iface in re2o_config.get("interfaces_subnet", dict()).items():
# ouais y'a une autre liste là, don't ask
for i in iface:
if i["vlan_id"] == vlan_id:
if vlans[vlan_id].get("ip") is None:
vlans[vlan_id]["ip"] = dict()
vlans[vlan_id]["ip"]["addr"] = address
vlans[vlan_id]["ip"]["subnet"] = i["netmask"]
for address, iface in re2o_config.get("interfaces6_subnet", dict()).items():
if iface["vlan_id"] == vlan_id:
if vlans[vlan_id].get("ip") is None:
vlans[vlan_id]["ip"] = dict()
vlans[vlan_id]["ip"]["addr6"] = address
vlans[vlan_id]["ip"]["subnet6"] = iface["netmask_cidr"]
# on ajoute les vlans qui ont besoin du dhcp snooping
if vlan["dhcp_snooping"]:
dhcp_snooping_vlans.append(vlan_id)
if vlan["dhcpv6_snooping"]:
dhcpv6_snooping_vlans.append(vlan_id)
if vlan["arp_protect"]:
arp_protect["vlans"].append(vlan_id)
vlans[vlan_id]["ipv6_mld"] = vlan["mld"]
# on récupère les informations intéressantes sur les ports
mac_based_ports = list()
ra_guard_ports = list()
interfaces = list()
loop_protect = { "ports": list()}
for port in re2o_config.get("ports"):
port_profile = port["get_port_profile"]
if port_profile["ra_guard"]:
ra_guard_ports.append(port["port"])
if port_profile["loop_protect"]:
loop_protect["ports"].append(port["port"])
iface = {
"name": port["pretty_name"],
"number": port["port"],
"dhcp_trust": not port_profile["dhcp_snooping"],
"dhcpv6_trust": not port_profile["dhcpv6_snooping"],
"flowcontrol": port_profile["flow_control"],
"arp_trust": not port_profile["arp_protect"],
}
if port_profile["radius_type"] == "MAC-radius":
mac_based_ports.append(port["port"])
iface["mac_based"] = True
iface["addr_limit"] = port_profile["mac_limit"]
iface["logoff"] = master_config.get("radius_logoff")
interfaces.append(iface)
loop_protect["ports"] = syntax_from_range(loop_protect["ports"])
arp_protect["vlans"] = syntax_from_range(arp_protect["vlans"], vlan_syntax=True)
interfaces.sort(key=lambda x: x["number"])
radius_key = re2o_config.get("get_radius_key_value")
radius_servers = [ {"ip": i, "secret": radius_key } for i in mgmt_utils["radius_servers"]["ipv4"] + mgmt_utils["radius_servers"]["ipv6"]]
config_dict = {
"header": header,
"location": re2o_config.get("switchbay").get("name"),
"hostname": re2o_config.get("short_name"),
"dhcp_servers": mgmt_utils.get("dhcp_servers").get("ipv4"),
"dhcpv6_servers": mgmt_utils.get("dhcp_servers").get("ipv6"),
"ipv4_managers": ipv4_managers,
"ipv6_managers": ipv6_managers,
"vlans": vlans,
"dhcp_snooping_vlans": syntax_from_range(dhcp_snooping_vlans, vlan_syntax=True),
"dhcpv6_snooping_vlans": syntax_from_range(dhcpv6_snooping_vlans, vlan_syntax=True),
"logging": mgmt_utils["log_servers"]["ipv4"] + mgmt_utils["log_servers"]["ipv6"],
"sntp": mgmt_utils["ntp_servers"]["ipv4"] + mgmt_utils["ntp_servers"]["ipv6"],
"dns": mgmt_utils["dns_recursive_servers"]["ipv4"] + mgmt_utils["dns_recursive_servers"]["ipv6"],
"radius_servers": radius_servers,
"snmp_user": master_config.get("snmp_user"),
"unauth_redirect": master_config.get("unauth_redirect"),
"mac_based_ports": syntax_from_range(mac_based_ports),
"ra_guard_ports": syntax_from_range(ra_guard_ports),
"loop_protect": loop_protect,
"arp_protect": arp_protect,
"interfaces": interfaces,
"FUCK": ['interfaces_subnet', 'ports', 'ipv6', 'ipv4', 'get_radius_key_value', 'interfaces6_subnet', 'list_modules']
}
pprint(re2o_config)
# pprint(re2o_config.get("interfaces_subnet"))
# pprint(re2o_config.get("list_modules"))
# raise RuntimeError()
return conf_from_dict(config_dict)
def connect_as_self(base_url, username=None):
"""Get a re2o token for the current unix user """
if username is None:
username = getpass.getuser()
r = requests.post(
base_url + "token-auth",
data={
"username": username,
"password": getpass.getpass("Login to {} as {} with password :".format(base_url, username))
}
)
if r.status_code != 200:
logger.critical("Wrong login/password")
exit(1)
token = r.json().get("token")
return token
def get_switch_from_results(results, switch_name):
for s in results:
if s.get("short_name") == switch_name:
return s
return None
def get_switch_from_re2o(re2o_instance, switch_name, re2o_user):
base_url = "{}/api/".format(re2o_instance)
token = connect_as_self(base_url, re2o_user)
headers = {"Authorization": "Token " + token}
# On récupère la config du bon switch
r = requests.get(base_url + "switchs/ports-config", headers=headers)
sw_config = get_switch_from_results(r.json()["results"], switch_name)
while r.json().get("next") and s is None:
r = requests.get(r.json().get("next"), headers=headers)
sw_config = get_switch_from_results(r.json()["results"], switch_name)
# on récupère les infos globales pour dhcp, managers etc.
r = requests.get(base_url + "preferences/optionaltopologie", headers=headers)
sw_config.update(r.json())
# on récupère la liste des vlans
r = requests.get(base_url + "machines/vlan", headers=headers)
pprint(r.json().get("results"))
sw_config.update({"vlans": r.json().get("results")})
return sw_config
if __name__ == "__main__":
format_string = "%(asctime)s - %(levelname)s - %(message)s"
stdout_handler = logging.StreamHandler()
stdout_formatter = colorlog.ColoredFormatter("%(log_color)s{}".format(format_string))
stdout_handler.setFormatter(stdout_formatter)
logger.addHandler(stdout_handler)
logger.setLevel(logging.DEBUG)
parser = argparse.ArgumentParser(description="Script de déploiement de la configuration des switchs")
parser.add_argument("-d", "--dry-run", action="store_true", help="Génère la nouvelle configuration et affiche le diff \
sans tenter de l'appliquer")
parser.add_argument("-w", "--whole", action="store_true", help="Affiche la configuration en entier au lieu du diff")
parser.add_argument("switch_name", type=str, help="Génère le template de ce switch")
parser.add_argument("-H", "--host", type=str, required=False, help="Host sur lequel de déployer la configuration au lieu de l'adresse dans le template")
parser.add_argument("-r", "--re2o", type=str, required=False, help="Si renseigné, la configuration sera récupérée depuis l'instance de re2o indiquée au lieu d'utiliser les fichiers yaml")
parser.add_argument("--re2o-user", type=str, required=False, help="Permet de choisir l'user pour se connecter à l'api re2o, par défaut on prend l'user unix courant")
parser.add_argument("-4", "--force-ipv4", action="store_true", help="Force le provisionning en ipv4")
args = parser.parse_args()
if args.re2o:
logger.debug("Loading master config")
master_config = yaml.load(open("configs/master.yml", "r"), yaml.Loader)
logger.debug("Loading config from re2o")
re2o_config = get_switch_from_re2o(args.re2o, args.switch_name, args.re2o_user)
else:
logger.debug("Loading master config")
master_config = yaml.load(open("configs/master.yml", "r"), yaml.Loader)
logger.debug("Loading config for {}".format(args.switch_name))
switch_config = yaml.load(open("configs/switches/{}.yml".format(args.switch_name), "r"), yaml.Loader)
if args.host:
switch_address = args.host
elif args.re2o:
if args.force_ipv4:
switch_address = re2o_config.get("ipv4")
else:
switch_address = re2o_config.get("ipv6") or re2o_config.get("ipv4")
else:
if args.force_ipv4:
switch_address = switch_config.get("ipv4-addr")
else:
switch_address = switch_config.get("ipv6-addr") or switch_config.get("ipv4-addr")
if switch_address is None:
switch_address = "{}.switches.crans.org".format(args.switch_name) #TODO: crans
logger.info("Connecting to {} with address {}".format(args.switch_name, switch_address))
session = connect_to_switch(switch_address, user="root", key=master_config.get("ssh_private_key")) #TODO: spécifier chemin clef
old_config = sftp_read_file(session, "cfg/running-config").decode("utf-8")
header = get_header(old_config)
# génération de la conf
logging.info("Generating configuration for {}".format(args.switch_name))
if args.re2o:
configuration = gen_conf_re2o(re2o_config, header)
else:
configuration = gen_conf(master_config, switch_config, header)
# génération du diff
for line in difflib.unified_diff(old_config.split("\n"), configuration.split("\n"), fromfile='origin', tofile='new', lineterm=""):
if line.startswith("-"):
termcolor.cprint(line, "red")
elif line.startswith("+"):
termcolor.cprint(line, "green")
elif line.startswith("@"):
termcolor.cprint(line, "yellow")
else:
print(line)
if args.dry_run or input("Voulez-vous déployer la configuration sur le switch ? y/[n]").lower() not in ["o","y"]:
logger.info("Aborting deployement")
exit(0)
with open("/tmp/conf_tmp", "w") as conf_tmp:
conf_tmp.write(configuration)
del session
logging.info("Uploading configuration for {}".format(args.switch_name))
session = connect_to_switch(switch_address, user="crans", key=master_config.get("ssh_private_key"))
sftp_write_file(session, "/tmp/conf_tmp", "cfg/startup-config")
logging.info("Deployement done for {}".format(args.switch_name))
# on retire ces deux lignes qui sont hardcodées dans les templates crans
# c'est deux adresses de multicast au pif, wtf ?
# https://gitlab.crans.org/nounous/scripts/-/commit/af2d158ff1a99dee2f032a81bf14de4144d2b82f
#-filter multicast 01005e-0000fb drop 1-52
#-filter multicast 333300-0000fb drop 1-52