Provisioning-switchs/generate-conf.py
2020-09-27 21:50:16 +02:00

300 lines
12 KiB
Python

import argparse
import difflib
import getpass
import logging
from pprint import pprint
import colorlog
import requests
import termcolor
import yaml
from jinja2 import Template
from core import connect_to_switch, sftp_read_file, sftp_write_file
logger = logging.getLogger()
def syntax_from_range(value, vlan_syntax=False):
if value == []:
return None
value = set(value)
prec = None
syntax = ""
in_range = False
for i in value:
if prec is None or prec != i - 1:
if syntax == "":
syntax = str(i)
elif in_range:
if vlan_syntax:
syntax += "-{} {}".format(prec,i)
else:
syntax += "-{},{}".format(prec,i)
in_range = False
else:
if vlan_syntax:
syntax += " {}".format(i)
else:
syntax += ",{}".format(i)
in_range = False
elif i == max(value):
syntax += "-{}".format(i)
else:
in_range = True
prec = i
if vlan_syntax:
syntax += " "
return syntax
def range_from_syntax(value, vlan_syntax=False):
if value is None:
return []
ret_range = list()
if vlan_syntax:
value = str(value).split(" ")
else:
value = str(value).split(",")
for v in value:
v = v.split("-")
if len(v) == 2:
v = list(range(int(v[0]), int(v[1])+1))
elif len(v) == 1:
v = [int(v[0])]
else:
raise RuntimeError("You fucked")
ret_range += v
return ret_range
def gen_vlan(vlan_number, switch_config, dhcp_snooping_vlans):
vlan_config = yaml.load(open("configs/vlans/{}.yml".format(vlan_number), "r"), yaml.Loader)
vlan = {
"name": vlan_config.get("name"),
"tagged": list(),
"untagged": list(),
}
if vlan_config.get("dhcp_snooping", True):
dhcp_snooping_vlans.append(int(vlan_number))
if vlan_config.get("has_ip"):
vlan["ip"] = dict()
v4 = switch_config.get("ipv4-addr")
v4_sub = switch_config.get("ipv4-subnet")
if v4 is not None and v4_sub is not None:
logger.debug("Adding {} {} to vlan {}".format(v4, v4_sub, vlan_number))
vlan["ip"]["addr"] = v4
vlan["ip"]["subnet"] = v4_sub
v6 = switch_config.get("ipv6-addr")
v6_sub = switch_config.get("ipv6-subnet")
if v6 is not None and v6_sub is not None:
logger.debug("Adding {}/{} to vlan {}".format(v6, v6_sub, vlan_number))
vlan["ip"]["addr6"] = v6
vlan["ip"]["subnet6"] = v6_sub
if vlan["ip"] == {}:
logger.critical("Missing IPs or subnets configurations for vlan {}".format(vlan_number))
raise RuntimeError()
return vlan, dhcp_snooping_vlans
def gen_interfaces(switch_config):
dhcp_snooping_vlans = list()
mac_based_ports = list()
ra_guard_ports = list()
ifaces_done = list()
interfaces = list()
vlans = dict()
has_ip = False
for iface, if_config in switch_config.get("interfaces").items():
if_profile = yaml.load(open("configs/profiles/{}.yml".format(if_config.get("profile"))), yaml.Loader)
iface = range_from_syntax(iface)
for iface_number in iface:
if not if_profile.get("ignore_port", False):
interface = {
"number": iface_number,
}
generic = if_profile.get("generic_name", None)
if generic is None:
interface["name"] = if_config["name"]
else:
interface["name"] = "TODO {} {}".format(generic, iface_number)
interface["mac_based"] = if_profile.get("mac_based", False)
if interface["mac_based"]:
mac_based_ports.append(iface_number)
if if_profile.get("ra_guard", True):
ra_guard_ports.append(iface_number)
interface["addr_limit"] = if_profile.get("addr_limit", 5)
interface["logoff"] = if_profile.get("logoff", 3600)
interfaces.append(interface)
ifaces_done.append(iface_number)
for vlan in range_from_syntax(if_profile.get("vlans").get("untagged")):
if vlans.get(vlan) is None:
vlans[vlan], dhcp_snooping_vlans = gen_vlan(vlan, switch_config, dhcp_snooping_vlans)
vlans[vlan]["untagged"].append(iface_number)
for vlan in range_from_syntax(if_profile.get("vlans").get("tagged")):
if vlans.get(vlan) is None:
vlans[vlan], dhcp_snooping_vlans = gen_vlan(vlan, switch_config, dhcp_snooping_vlans)
vlans[vlan]["tagged"].append(iface_number)
ifaces_done = set(ifaces_done)
# on reformatte les interfaces des vlans
for vlan, v_conf in vlans.items():
vlans[vlan]["tagged"] = syntax_from_range(v_conf["tagged"])
vlans[vlan]["untagged"] = syntax_from_range(v_conf["untagged"])
if max(ifaces_done) != switch_config.get("nb_ports") or len(ifaces_done) != switch_config.get("nb_ports"):
raise RuntimeError("Interfaces fucked")
interfaces.sort(key=lambda x: x["number"])
return interfaces, vlans, mac_based_ports, ra_guard_ports, dhcp_snooping_vlans
def get_header(old_config):
header = "\n".join(old_config.split("\n")[:2])
return header
def conf_from_dict(config_dict):
with open("configs/config.j2", "r") as template_file:
template = Template(template_file.read())
configuration = template.render(config_dict)
return configuration
def gen_conf(master_config, switch_config, header):
interfaces, vlans, mac_based_ports, ra_guard_ports, dhcp_snooping_vlans = gen_interfaces(switch_config)
config_dict = {
"header": header,
"hostname": switch_config.get("hostname"),
"dhcp_servers": master_config.get("dhcp_servers"),
"dhcpv6_servers": master_config.get("dhcpv6_servers"),
"snmp_user": master_config.get("snmp_user"),
"interfaces": interfaces,
"vlans": vlans,
"sntp": master_config.get("sntp_servers"),
"ipv4_managers": master_config.get("managers_v4", {}),
"ipv6_managers": master_config.get("managers_v6", {}),
"dns": master_config.get("dns"),
"location": switch_config["location"],
"radius_servers": master_config.get("radius_servers"),
"mac_based_ports": syntax_from_range(mac_based_ports),
"ra_guard_ports": syntax_from_range(ra_guard_ports),
"unauth_redirect": master_config.get("unauth_redirect"),
"dhcp_snooping_vlans": syntax_from_range(dhcp_snooping_vlans, vlan_syntax=True),
}
return conf_from_dict(config_dict)
def gen_conf_re2o(re2o_config, header):
print(re2o_config.keys())
raise RuntimeError()
config_dict = {
"header": header
}
return conf_from_dict(config_dict)
def connect_as_self(base_url, username=None):
"""Get a re2o token for the current unix user """
if username is None:
username = getpass.getuser()
r = requests.post(
base_url + "token-auth",
data={
"username": username,
"password": getpass.getpass("Login to {} as {} with password :".format(base_url, username))
}
)
if r.status_code != 200:
logger.critical("Wrong login/password")
exit(1)
token = r.json().get("token")
return token
def get_switch_from_results(results, switch_name):
for s in results:
if s.get("short_name"):
return s
return None
def get_switch_from_re2o(re2o_instance, switch_name, re2o_user):
base_url = "{}/api/".format(re2o_instance)
token = connect_as_self(base_url, re2o_user)
headers = {"Authorization": "Token " + token}
r = requests.get(base_url + "switchs/ports-config", headers=headers)
sw_config = get_switch_from_results(r.json()["results"], switch_name)
while r.json().get("next") and s is None:
r = requests.get(r.json().get("next"), headers=headers)
sw_config = get_switch_from_results(r.json()["results"], switch_name)
return sw_config
if __name__ == "__main__":
format_string = "%(asctime)s - %(levelname)s - %(message)s"
stdout_handler = logging.StreamHandler()
stdout_formatter = colorlog.ColoredFormatter("%(log_color)s{}".format(format_string))
stdout_handler.setFormatter(stdout_formatter)
logger.addHandler(stdout_handler)
logger.setLevel(logging.DEBUG)
parser = argparse.ArgumentParser(description="Script de déploiement de la configuration des switchs")
parser.add_argument("-d", "--dry-run", action="store_true", help="Génère la nouvelle configuration et affiche le diff \
sans tenter de l'appliquer")
parser.add_argument("-w", "--whole", action="store_true", help="Affiche la configuration en entier au lieu du diff")
parser.add_argument("switch_name", type=str, help="Génère le template de ce switch")
parser.add_argument("-H", "--host", type=str, required=False, help="Host sur lequel de déployer la configuration au lieu de l'adresse dans le template")
parser.add_argument("-r", "--re2o", type=str, required=False, help="Si renseigné, la configuration sera récupérée depuis l'instance de re2o indiquée au lieu d'utiliser les fichiers yaml")
parser.add_argument("--re2o-user", type=str, required=False, help="Permet de choisir l'user pour se connecter à l'api re2o, par défaut on prend l'user unix courant")
parser.add_argument("-4", "--force-ipv4", action="store_true", help="Force le provisionning en ipv4")
args = parser.parse_args()
if args.re2o:
logger.debug("Loading master config")
master_config = yaml.load(open("configs/master.yml", "r"), yaml.Loader)
logger.debug("Loading config from re2o")
re2o_config = get_switch_from_re2o(args.re2o, args.switch_name, args.re2o_user)
else:
logger.debug("Loading master config")
master_config = yaml.load(open("configs/master.yml", "r"), yaml.Loader)
logger.debug("Loading config for {}".format(args.switch_name))
switch_config = yaml.load(open("configs/switches/{}.yml".format(args.switch_name), "r"), yaml.Loader)
if args.host:
switch_address = args.host
elif args.re2o:
if args.force_ipv4:
switch_address = re2o_config.get("ipv4")
else:
switch_address = re2o_config.get("ipv6") or re2o_config.get("ipv4")
else:
if args.force_ipv4:
switch_address = switch_config.get("ipv4-addr")
else:
switch_address = switch_config.get("ipv6-addr") or switch_config.get("ipv4-addr")
if switch_address is None:
switch_address = "{}.switches.crans.org".format(args.switch_name) #TODO: crans
logger.info("Connecting to {} with address {}".format(args.switch_name, switch_address))
session = connect_to_switch(switch_address, user="root", key=master_config.get("ssh_private_key")) #TODO: spécifier chemin clef
old_config = sftp_read_file(session, "cfg/running-config").decode("utf-8")
header = get_header(old_config)
# génération de la conf
logging.info("Generating configuration for {}".format(args.switch_name))
if args.re2o:
configuration = gen_conf_re2o(re2o_config, header)
else:
configuration = gen_conf(master_config, switch_config, header)
# génération du diff
for line in difflib.unified_diff(old_config.split("\n"), configuration.split("\n"), fromfile='origin', tofile='new', lineterm=""):
if line.startswith("-"):
termcolor.cprint(line, "red")
elif line.startswith("+"):
termcolor.cprint(line, "green")
elif line.startswith("@"):
termcolor.cprint(line, "yellow")
else:
print(line)
if args.dry_run or input("Voulez-vous déployer la configuration sur le switch ? y/[n]").lower() not in ["o","y"]:
logger.info("Aborting deployement")
exit(0)
with open("/tmp/conf_tmp", "w") as conf_tmp:
conf_tmp.write(configuration)
del session
logging.info("Uploading configuration for {}".format(args.switch_name))
session = connect_to_switch(switch_address, user="crans", key=master_config.get("ssh_private_key"))
sftp_write_file(session, "/tmp/conf_tmp", "cfg/startup-config")
logging.info("Deployement done for {}".format(args.switch_name))