224 lines
9.5 KiB
Python
224 lines
9.5 KiB
Python
import argparse
|
|
import difflib
|
|
import logging
|
|
|
|
from pprint import pprint
|
|
|
|
import colorlog
|
|
import yaml
|
|
|
|
from jinja2 import Template
|
|
|
|
from core import connect_to_switch, sftp_read_file, sftp_write_file
|
|
|
|
logger = logging.getLogger()
|
|
|
|
def syntax_from_range(value, vlan_syntax=False):
|
|
if value == []:
|
|
return None
|
|
value = set(value)
|
|
prec = None
|
|
syntax = ""
|
|
in_range = False
|
|
for i in value:
|
|
print(i, type(i))
|
|
if prec is None or prec != i - 1:
|
|
if syntax == "":
|
|
syntax = str(i)
|
|
elif in_range:
|
|
if vlan_syntax:
|
|
syntax += "-{} {}".format(prec,i)
|
|
else:
|
|
syntax += "-{},{}".format(prec,i)
|
|
in_range = False
|
|
else:
|
|
if vlan_syntax:
|
|
syntax += " {}".format(i)
|
|
else:
|
|
syntax += ",{}".format(i)
|
|
in_range = False
|
|
elif i == max(value):
|
|
syntax += "-{}".format(i)
|
|
else:
|
|
in_range = True
|
|
prec = i
|
|
if vlan_syntax:
|
|
syntax += " "
|
|
return syntax
|
|
|
|
def range_from_syntax(value, vlan_syntax=False):
|
|
if value is None:
|
|
return []
|
|
ret_range = list()
|
|
if vlan_syntax:
|
|
value = str(value).split(" ")
|
|
else:
|
|
value = str(value).split(",")
|
|
for v in value:
|
|
v = v.split("-")
|
|
if len(v) == 2:
|
|
v = list(range(int(v[0]), int(v[1])+1))
|
|
elif len(v) == 1:
|
|
v = [int(v[0])]
|
|
else:
|
|
raise RuntimeError("You fucked")
|
|
ret_range += v
|
|
return ret_range
|
|
|
|
def gen_vlan(vlan_number, switch_config, dhcp_snooping_vlans):
|
|
vlan_config = yaml.load(open("configs/vlans/{}.yml".format(vlan_number), "r"), yaml.Loader)
|
|
vlan = {
|
|
"name": vlan_config.get("name"),
|
|
"tagged": list(),
|
|
"untagged": list(),
|
|
}
|
|
if vlan_config.get("dhcp_snooping", True):
|
|
dhcp_snooping_vlans.append(int(vlan_number))
|
|
if vlan_config.get("has_ip"):
|
|
vlan["ip"] = dict()
|
|
v4 = switch_config.get("ipv4-addr")
|
|
v4_sub = switch_config.get("ipv4-subnet")
|
|
if v4 is not None and v4_sub is not None:
|
|
logger.debug("Adding {} {} to vlan {}".format(v4, v4_sub, vlan_number))
|
|
vlan["ip"]["addr"] = v4
|
|
vlan["ip"]["subnet"] = v4_sub
|
|
v6 = switch_config.get("ipv6-addr")
|
|
v6_sub = switch_config.get("ipv6-subnet")
|
|
if v6 is not None and v6_sub is not None:
|
|
logger.debug("Adding {}/{} to vlan {}".format(v6, v6_sub, vlan_number))
|
|
vlan["ip"]["addr6"] = v6
|
|
vlan["ip"]["subnet6"] = v6_sub
|
|
if vlan["ip"] == {}:
|
|
logger.critical("Missing IPs or subnets configurations for vlan {}".format(vlan_number))
|
|
raise RuntimeError()
|
|
return vlan, dhcp_snooping_vlans
|
|
|
|
def gen_interfaces(switch_config):
|
|
dhcp_snooping_vlans = list()
|
|
mac_based_ports = list()
|
|
ra_guard_ports = list()
|
|
ifaces_done = list()
|
|
interfaces = list()
|
|
vlans = dict()
|
|
has_ip = False
|
|
for iface, if_config in switch_config.get("interfaces").items():
|
|
if_profile = yaml.load(open("configs/profiles/{}.yml".format(if_config.get("profile"))), yaml.Loader)
|
|
iface = range_from_syntax(iface)
|
|
for iface_number in iface:
|
|
print("if_num", iface_number)
|
|
if not if_profile.get("ignore_port", False):
|
|
interface = {
|
|
"number": iface_number,
|
|
}
|
|
generic = if_profile.get("generic_name", None)
|
|
if generic is None:
|
|
interface["name"] = if_config["name"]
|
|
else:
|
|
interface["name"] = "TODO {} {}".format(generic, iface_number)
|
|
interface["mac_based"] = if_profile.get("mac_based", False)
|
|
if interface["mac_based"]:
|
|
mac_based_ports.append(iface_number)
|
|
if if_profile.get("ra_guard", True):
|
|
ra_guard_ports.append(iface_number)
|
|
interface["addr_limit"] = if_profile.get("addr_limit", 5)
|
|
interface["logoff"] = if_profile.get("logoff", 3600)
|
|
interfaces.append(interface)
|
|
ifaces_done.append(iface_number)
|
|
for vlan in range_from_syntax(if_profile.get("vlans").get("untagged")):
|
|
if vlans.get(vlan) is None:
|
|
vlans[vlan], dhcp_snooping_vlans = gen_vlan(vlan, switch_config, dhcp_snooping_vlans)
|
|
vlans[vlan]["untagged"].append(iface_number)
|
|
for vlan in range_from_syntax(if_profile.get("vlans").get("tagged")):
|
|
if vlans.get(vlan) is None:
|
|
vlans[vlan], dhcp_snooping_vlans = gen_vlan(vlan, switch_config, dhcp_snooping_vlans)
|
|
vlans[vlan]["tagged"].append(iface_number)
|
|
ifaces_done = set(ifaces_done)
|
|
# on reformatte les interfaces des vlans
|
|
for vlan, v_conf in vlans.items():
|
|
vlans[vlan]["tagged"] = syntax_from_range(v_conf["tagged"])
|
|
vlans[vlan]["untagged"] = syntax_from_range(v_conf["untagged"])
|
|
|
|
if max(ifaces_done) != switch_config.get("nb_ports") or len(ifaces_done) != switch_config.get("nb_ports"):
|
|
raise RuntimeError("Interfaces fucked")
|
|
interfaces.sort(key=lambda x: x["number"])
|
|
return interfaces, vlans, mac_based_ports, ra_guard_ports, dhcp_snooping_vlans
|
|
|
|
def gen_conf(master_config, switch_config, old_config):
|
|
header = "\n".join(old_config.split("\n")[:2])
|
|
interfaces, vlans, mac_based_ports, ra_guard_ports, dhcp_snooping_vlans = gen_interfaces(switch_config)
|
|
print("mac", set(mac_based_ports))
|
|
print("ra", set(ra_guard_ports))
|
|
print("dhcp", set(dhcp_snooping_vlans))
|
|
config_dict = {
|
|
"header": header,
|
|
"hostname": switch_config.get("hostname"),
|
|
"dhcp_servers": master_config.get("dhcp_servers"),
|
|
"dhcpv6_servers": master_config.get("dhcpv6_servers"),
|
|
"snmp_user": master_config.get("snmp_user"),
|
|
"interfaces": interfaces,
|
|
"vlans": vlans,
|
|
"sntp": master_config.get("sntp_servers"),
|
|
"ipv4_managers": master_config.get("managers_v4", {}),
|
|
"ipv6_managers": master_config.get("managers_v6", {}),
|
|
"dns": master_config.get("dns"),
|
|
"location": switch_config["location"],
|
|
"radius_servers": master_config.get("radius_servers"),
|
|
"mac_based_ports": syntax_from_range(mac_based_ports),
|
|
"ra_guard_ports": syntax_from_range(ra_guard_ports),
|
|
"unauth_redirect": master_config.get("unauth_redirect"),
|
|
"dhcp_snooping_vlans": syntax_from_range(dhcp_snooping_vlans, vlan_syntax=True),
|
|
}
|
|
with open("configs/config.j2", "r") as template_file:
|
|
template = Template(template_file.read())
|
|
configuration = template.render(config_dict)
|
|
return configuration
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
format_string = "%(asctime)s - %(levelname)s - %(message)s"
|
|
stdout_handler = logging.StreamHandler()
|
|
stdout_formatter = colorlog.ColoredFormatter("%(log_color)s{}".format(format_string))
|
|
stdout_handler.setFormatter(stdout_formatter)
|
|
logger.addHandler(stdout_handler)
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
parser = argparse.ArgumentParser(description="Script de déploiement de la configuration des switchs")
|
|
parser.add_argument("-d", "--dry-run", action="store_true", help="Génère la nouvelle configuration et affiche le diff \
|
|
sans tenter de l'appliquer")
|
|
parser.add_argument("-w", "--whole", action="store_true", help="Affiche la configuration en entier au lieu du diff")
|
|
parser.add_argument("switch_name", type=str, help="Génère le template de ce switch")
|
|
parser.add_argument("-H", "--host", type=str, required=False, help="Host sur lequel de déployer la configuration au lieu de l'adresse dans le template")
|
|
args = parser.parse_args()
|
|
|
|
logger.debug("Loading master config")
|
|
master_config = yaml.load(open("configs/master.yml", "r"), yaml.Loader)
|
|
logger.debug("Loading config for {}".format(args.switch_name))
|
|
switch_config = yaml.load(open("configs/switches/{}.yml".format(args.switch_name), "r"), yaml.Loader)
|
|
|
|
if args.host:
|
|
switch_address = args.host
|
|
else:
|
|
switch_address = switch_config.get("ipv6-addr") or switch_config.get("ipv4-addr")
|
|
if switch_address is None:
|
|
switch_address = "{}.switches.crans.org".format(args.switch_name)
|
|
|
|
logger.info("Connecting to {} with address {}".format(args.switch_name, switch_address))
|
|
session = connect_to_switch(switch_address, user="root", key=master_config.get("ssh_private_key"))
|
|
old_config = sftp_read_file(session, "cfg/running-config").decode("utf-8")
|
|
|
|
logging.info("Generating configuration for {}".format(args.switch_name))
|
|
configuration = gen_conf(master_config, switch_config, old_config)
|
|
for line in difflib.unified_diff(old_config.split("\n"), configuration.split("\n"), fromfile='origin', tofile='new', lineterm=""):
|
|
print(line)
|
|
if args.dry_run or input("Voulez-vous déployer la configuration sur le switch ? y/[n]").lower() not in ["o","y"]:
|
|
logger.info("Aborting deployement")
|
|
exit(0)
|
|
with open("/tmp/conf_tmp", "w") as conf_tmp:
|
|
conf_tmp.write(configuration)
|
|
del session
|
|
logging.info("Uploading configuration for {}".format(args.switch_name))
|
|
session = connect_to_switch(switch_address, user="crans", key=master_config.get("ssh_private_key"))
|
|
sftp_write_file(session, "/tmp/conf_tmp", "cfg/startup-config")
|
|
logging.info("Deployement done for {}".format(args.switch_name))
|