Provisioning-switchs/generate-conf.py
2020-08-14 14:53:17 +02:00

227 lines
9.5 KiB
Python

import argparse
import difflib
import logging
from pprint import pprint
import colorlog
import termcolor
import yaml
from jinja2 import Template
from core import connect_to_switch, sftp_read_file, sftp_write_file
logger = logging.getLogger()
def syntax_from_range(value, vlan_syntax=False):
if value == []:
return None
value = set(value)
prec = None
syntax = ""
in_range = False
for i in value:
if prec is None or prec != i - 1:
if syntax == "":
syntax = str(i)
elif in_range:
if vlan_syntax:
syntax += "-{} {}".format(prec,i)
else:
syntax += "-{},{}".format(prec,i)
in_range = False
else:
if vlan_syntax:
syntax += " {}".format(i)
else:
syntax += ",{}".format(i)
in_range = False
elif i == max(value):
syntax += "-{}".format(i)
else:
in_range = True
prec = i
if vlan_syntax:
syntax += " "
return syntax
def range_from_syntax(value, vlan_syntax=False):
if value is None:
return []
ret_range = list()
if vlan_syntax:
value = str(value).split(" ")
else:
value = str(value).split(",")
for v in value:
v = v.split("-")
if len(v) == 2:
v = list(range(int(v[0]), int(v[1])+1))
elif len(v) == 1:
v = [int(v[0])]
else:
raise RuntimeError("You fucked")
ret_range += v
return ret_range
def gen_vlan(vlan_number, switch_config, dhcp_snooping_vlans):
vlan_config = yaml.load(open("configs/vlans/{}.yml".format(vlan_number), "r"), yaml.Loader)
vlan = {
"name": vlan_config.get("name"),
"tagged": list(),
"untagged": list(),
}
if vlan_config.get("dhcp_snooping", True):
dhcp_snooping_vlans.append(int(vlan_number))
if vlan_config.get("has_ip"):
vlan["ip"] = dict()
v4 = switch_config.get("ipv4-addr")
v4_sub = switch_config.get("ipv4-subnet")
if v4 is not None and v4_sub is not None:
logger.debug("Adding {} {} to vlan {}".format(v4, v4_sub, vlan_number))
vlan["ip"]["addr"] = v4
vlan["ip"]["subnet"] = v4_sub
v6 = switch_config.get("ipv6-addr")
v6_sub = switch_config.get("ipv6-subnet")
if v6 is not None and v6_sub is not None:
logger.debug("Adding {}/{} to vlan {}".format(v6, v6_sub, vlan_number))
vlan["ip"]["addr6"] = v6
vlan["ip"]["subnet6"] = v6_sub
if vlan["ip"] == {}:
logger.critical("Missing IPs or subnets configurations for vlan {}".format(vlan_number))
raise RuntimeError()
return vlan, dhcp_snooping_vlans
def gen_interfaces(switch_config):
dhcp_snooping_vlans = list()
mac_based_ports = list()
ra_guard_ports = list()
ifaces_done = list()
interfaces = list()
vlans = dict()
has_ip = False
for iface, if_config in switch_config.get("interfaces").items():
if_profile = yaml.load(open("configs/profiles/{}.yml".format(if_config.get("profile"))), yaml.Loader)
iface = range_from_syntax(iface)
for iface_number in iface:
if not if_profile.get("ignore_port", False):
interface = {
"number": iface_number,
}
generic = if_profile.get("generic_name", None)
if generic is None:
interface["name"] = if_config["name"]
else:
interface["name"] = "TODO {} {}".format(generic, iface_number)
interface["mac_based"] = if_profile.get("mac_based", False)
if interface["mac_based"]:
mac_based_ports.append(iface_number)
if if_profile.get("ra_guard", True):
ra_guard_ports.append(iface_number)
interface["addr_limit"] = if_profile.get("addr_limit", 5)
interface["logoff"] = if_profile.get("logoff", 3600)
interfaces.append(interface)
ifaces_done.append(iface_number)
for vlan in range_from_syntax(if_profile.get("vlans").get("untagged")):
if vlans.get(vlan) is None:
vlans[vlan], dhcp_snooping_vlans = gen_vlan(vlan, switch_config, dhcp_snooping_vlans)
vlans[vlan]["untagged"].append(iface_number)
for vlan in range_from_syntax(if_profile.get("vlans").get("tagged")):
if vlans.get(vlan) is None:
vlans[vlan], dhcp_snooping_vlans = gen_vlan(vlan, switch_config, dhcp_snooping_vlans)
vlans[vlan]["tagged"].append(iface_number)
ifaces_done = set(ifaces_done)
# on reformatte les interfaces des vlans
for vlan, v_conf in vlans.items():
vlans[vlan]["tagged"] = syntax_from_range(v_conf["tagged"])
vlans[vlan]["untagged"] = syntax_from_range(v_conf["untagged"])
if max(ifaces_done) != switch_config.get("nb_ports") or len(ifaces_done) != switch_config.get("nb_ports"):
raise RuntimeError("Interfaces fucked")
interfaces.sort(key=lambda x: x["number"])
return interfaces, vlans, mac_based_ports, ra_guard_ports, dhcp_snooping_vlans
def gen_conf(master_config, switch_config, old_config):
header = "\n".join(old_config.split("\n")[:2])
interfaces, vlans, mac_based_ports, ra_guard_ports, dhcp_snooping_vlans = gen_interfaces(switch_config)
config_dict = {
"header": header,
"hostname": switch_config.get("hostname"),
"dhcp_servers": master_config.get("dhcp_servers"),
"dhcpv6_servers": master_config.get("dhcpv6_servers"),
"snmp_user": master_config.get("snmp_user"),
"interfaces": interfaces,
"vlans": vlans,
"sntp": master_config.get("sntp_servers"),
"ipv4_managers": master_config.get("managers_v4", {}),
"ipv6_managers": master_config.get("managers_v6", {}),
"dns": master_config.get("dns"),
"location": switch_config["location"],
"radius_servers": master_config.get("radius_servers"),
"mac_based_ports": syntax_from_range(mac_based_ports),
"ra_guard_ports": syntax_from_range(ra_guard_ports),
"unauth_redirect": master_config.get("unauth_redirect"),
"dhcp_snooping_vlans": syntax_from_range(dhcp_snooping_vlans, vlan_syntax=True),
}
with open("configs/config.j2", "r") as template_file:
template = Template(template_file.read())
configuration = template.render(config_dict)
return configuration
if __name__ == "__main__":
format_string = "%(asctime)s - %(levelname)s - %(message)s"
stdout_handler = logging.StreamHandler()
stdout_formatter = colorlog.ColoredFormatter("%(log_color)s{}".format(format_string))
stdout_handler.setFormatter(stdout_formatter)
logger.addHandler(stdout_handler)
logger.setLevel(logging.DEBUG)
parser = argparse.ArgumentParser(description="Script de déploiement de la configuration des switchs")
parser.add_argument("-d", "--dry-run", action="store_true", help="Génère la nouvelle configuration et affiche le diff \
sans tenter de l'appliquer")
parser.add_argument("-w", "--whole", action="store_true", help="Affiche la configuration en entier au lieu du diff")
parser.add_argument("switch_name", type=str, help="Génère le template de ce switch")
parser.add_argument("-H", "--host", type=str, required=False, help="Host sur lequel de déployer la configuration au lieu de l'adresse dans le template")
args = parser.parse_args()
logger.debug("Loading master config")
master_config = yaml.load(open("configs/master.yml", "r"), yaml.Loader)
logger.debug("Loading config for {}".format(args.switch_name))
switch_config = yaml.load(open("configs/switches/{}.yml".format(args.switch_name), "r"), yaml.Loader)
if args.host:
switch_address = args.host
else:
switch_address = switch_config.get("ipv6-addr") or switch_config.get("ipv4-addr")
if switch_address is None:
switch_address = "{}.switches.crans.org".format(args.switch_name)
logger.info("Connecting to {} with address {}".format(args.switch_name, switch_address))
session = connect_to_switch(switch_address, user="root", key=master_config.get("ssh_private_key"))
old_config = sftp_read_file(session, "cfg/running-config").decode("utf-8")
logging.info("Generating configuration for {}".format(args.switch_name))
configuration = gen_conf(master_config, switch_config, old_config)
for line in difflib.unified_diff(old_config.split("\n"), configuration.split("\n"), fromfile='origin', tofile='new', lineterm=""):
if line.startswith("-"):
termcolor.cprint(line, "red")
elif line.startswith("+"):
termcolor.cprint(line, "green")
elif line.startswith("@"):
termcolor.cprint(line, "yellow")
else:
print(line)
if args.dry_run or input("Voulez-vous déployer la configuration sur le switch ? y/[n]").lower() not in ["o","y"]:
logger.info("Aborting deployement")
exit(0)
with open("/tmp/conf_tmp", "w") as conf_tmp:
conf_tmp.write(configuration)
del session
logging.info("Uploading configuration for {}".format(args.switch_name))
session = connect_to_switch(switch_address, user="crans", key=master_config.get("ssh_private_key"))
sftp_write_file(session, "/tmp/conf_tmp", "cfg/startup-config")
logging.info("Deployement done for {}".format(args.switch_name))