import argparse import difflib import getpass import logging import colorlog import requests import termcolor import yaml from jinja2 import Template from core import connect_to_switch, sftp_read_file, sftp_write_file logger = logging.getLogger() def syntax_from_range(value, vlan_syntax=False): if value == []: return None value = set(value) prec = None syntax = "" in_range = False for i in value: if prec is None or prec != i - 1: if syntax == "": syntax = str(i) elif in_range: if vlan_syntax: syntax += "-{} {}".format(prec,i) else: syntax += "-{},{}".format(prec,i) in_range = False else: if vlan_syntax: syntax += " {}".format(i) else: syntax += ",{}".format(i) in_range = False elif i == max(value): syntax += "-{}".format(i) else: in_range = True prec = i if vlan_syntax: syntax += " " return syntax def range_from_syntax(value, vlan_syntax=False): if value is None: return [] ret_range = list() if vlan_syntax: value = str(value).split(" ") else: value = str(value).split(",") for v in value: v = v.split("-") if len(v) == 2: v = list(range(int(v[0]), int(v[1])+1)) elif len(v) == 1: v = [int(v[0])] else: raise RuntimeError("You fucked") ret_range += v return ret_range def gen_vlan(vlan_number, switch_config, dhcp_snooping_vlans): vlan_config = yaml.load(open("configs/vlans/{}.yml".format(vlan_number), "r"), yaml.Loader) vlan = { "name": vlan_config.get("name"), "tagged": list(), "untagged": list(), } if vlan_config.get("dhcp_snooping", True): dhcp_snooping_vlans.append(int(vlan_number)) if vlan_config.get("has_ip"): vlan["ip"] = dict() v4 = switch_config.get("ipv4-addr") v4_sub = switch_config.get("ipv4-subnet") if v4 is not None and v4_sub is not None: logger.debug("Adding {} {} to vlan {}".format(v4, v4_sub, vlan_number)) vlan["ip"]["addr"] = v4 vlan["ip"]["subnet"] = v4_sub v6 = switch_config.get("ipv6-addr") v6_sub = switch_config.get("ipv6-subnet") if v6 is not None and v6_sub is not None: logger.debug("Adding {}/{} to vlan {}".format(v6, v6_sub, vlan_number)) vlan["ip"]["addr6"] = v6 vlan["ip"]["subnet6"] = v6_sub if vlan["ip"] == {}: logger.critical("Missing IPs or subnets configurations for vlan {}".format(vlan_number)) raise RuntimeError() return vlan, dhcp_snooping_vlans def gen_interfaces(switch_config): dhcp_snooping_vlans = list() mac_based_ports = list() ra_guard_ports = list() ifaces_done = list() interfaces = list() vlans = dict() has_ip = False for iface, if_config in switch_config.get("interfaces").items(): if_profile = yaml.load(open("configs/profiles/{}.yml".format(if_config.get("profile"))), yaml.Loader) iface = range_from_syntax(iface) for iface_number in iface: if not if_profile.get("ignore_port", False): interface = { "number": iface_number, } generic = if_profile.get("generic_name", None) if generic is None: interface["name"] = if_config["name"] else: interface["name"] = "TODO {} {}".format(generic, iface_number) interface["mac_based"] = if_profile.get("mac_based", False) if interface["mac_based"]: mac_based_ports.append(iface_number) if if_profile.get("ra_guard", True): ra_guard_ports.append(iface_number) interface["addr_limit"] = if_profile.get("addr_limit", 5) interface["logoff"] = if_profile.get("logoff", 3600) interfaces.append(interface) ifaces_done.append(iface_number) for vlan in range_from_syntax(if_profile.get("vlans").get("untagged")): if vlans.get(vlan) is None: vlans[vlan], dhcp_snooping_vlans = gen_vlan(vlan, switch_config, dhcp_snooping_vlans) vlans[vlan]["untagged"].append(iface_number) for vlan in range_from_syntax(if_profile.get("vlans").get("tagged")): if vlans.get(vlan) is None: vlans[vlan], dhcp_snooping_vlans = gen_vlan(vlan, switch_config, dhcp_snooping_vlans) vlans[vlan]["tagged"].append(iface_number) ifaces_done = set(ifaces_done) # on reformatte les interfaces des vlans for vlan, v_conf in vlans.items(): vlans[vlan]["tagged"] = syntax_from_range(v_conf["tagged"]) vlans[vlan]["untagged"] = syntax_from_range(v_conf["untagged"]) if max(ifaces_done) != switch_config.get("nb_ports") or len(ifaces_done) != switch_config.get("nb_ports"): raise RuntimeError("Interfaces fucked") interfaces.sort(key=lambda x: x["number"]) return interfaces, vlans, mac_based_ports, ra_guard_ports, dhcp_snooping_vlans def get_header(old_config): header = "\n".join(old_config.split("\n")[:2]) return header def conf_from_dict(config_dict): with open("configs/config.j2", "r") as template_file: template = Template(template_file.read()) configuration = template.render(config_dict) return configuration def gen_conf(master_config, switch_config, header): interfaces, vlans, mac_based_ports, ra_guard_ports, dhcp_snooping_vlans = gen_interfaces(switch_config) config_dict = { "header": header, "hostname": switch_config.get("hostname"), "dhcp_servers": master_config.get("dhcp_servers"), "dhcpv6_servers": master_config.get("dhcpv6_servers"), "snmp_user": master_config.get("snmp_user"), "interfaces": interfaces, "vlans": vlans, "sntp": master_config.get("sntp_servers"), "ipv4_managers": master_config.get("managers_v4", {}), "ipv6_managers": master_config.get("managers_v6", {}), "dns": master_config.get("dns"), "location": switch_config["location"], "radius_servers": master_config.get("radius_servers"), "mac_based_ports": syntax_from_range(mac_based_ports), "ra_guard_ports": syntax_from_range(ra_guard_ports), "unauth_redirect": master_config.get("unauth_redirect"), "dhcp_snooping_vlans": syntax_from_range(dhcp_snooping_vlans, vlan_syntax=True), } return conf_from_dict(config_dict) def gen_conf_re2o(re2o_config, header): mgmt_utils = re2o_config.get("switchs_management_utils") ipv4_managers = dict() for m in mgmt_utils.get("subnet"): ipv4_managers[m.get("network")] = { "ip": m.get("network"), "subnet": m.get("netmask")} ipv6_managers = dict() # FUCK YOU ! subnet6 c'est pas une liste de subnets mais un seul subnet m = mgmt_utils.get("subnet6") ipv6_managers[m.get("network")] = { "ip": m.get("network"), "subnet": m.get("netmask")} vlans = dict() dhcp_snooping_vlans = list() dhcpv6_snooping_vlans = list() arp_protect = { "vlans": list() } for vlan in re2o_config.get("vlans"): vlan_id = vlan["vlan_id"] vlans[vlan_id] = vlan range_tagged = list() range_untagged = list() # on récupère les ports tag et untagged for port in re2o_config.get("ports"): port_profile = port["get_port_profile"] for v in port_profile["vlan_tagged"]: if v["vlan_id"] == vlan_id: range_tagged.append(port["port"]) # i n'y a qu'un seul vlan untagged v = port_profile["vlan_untagged"] if v is not None and v["vlan_id"] == vlan_id: range_untagged.append(port["port"]) vlans[vlan_id]["tagged"] = syntax_from_range(range_tagged) vlans[vlan_id]["untagged"] = syntax_from_range(range_untagged) # on rajoute les ips sur les vlans où il y en a for address, iface in re2o_config.get("interfaces_subnet", dict()).items(): # ouais y'a une autre liste là, don't ask for i in iface: if i["vlan_id"] == vlan_id: if vlans[vlan_id].get("ip") is None: vlans[vlan_id]["ip"] = dict() vlans[vlan_id]["ip"]["addr"] = address vlans[vlan_id]["ip"]["subnet"] = i["netmask"] for address, iface in re2o_config.get("interfaces6_subnet", dict()).items(): if iface["vlan_id"] == vlan_id: if vlans[vlan_id].get("ip") is None: vlans[vlan_id]["ip"] = dict() vlans[vlan_id]["ip"]["addr6"] = address vlans[vlan_id]["ip"]["subnet6"] = iface["netmask_cidr"] # on ajoute les vlans qui ont besoin du dhcp snooping if vlan["dhcp_snooping"]: dhcp_snooping_vlans.append(vlan_id) if vlan["dhcpv6_snooping"]: dhcpv6_snooping_vlans.append(vlan_id) if vlan["arp_protect"]: arp_protect["vlans"].append(vlan_id) vlans[vlan_id]["ipv6_mld"] = vlan["mld"] # on récupère les informations intéressantes sur les ports mac_based_ports = list() ra_guard_ports = list() interfaces = list() loop_protect = { "ports": list()} for port in re2o_config.get("ports"): port_profile = port["get_port_profile"] if port_profile["ra_guard"]: ra_guard_ports.append(port["port"]) if port_profile["loop_protect"]: loop_protect["ports"].append(port["port"]) iface = { "name": port["pretty_name"], "number": port["port"], "dhcp_trust": not port_profile["dhcp_snooping"], "dhcpv6_trust": not port_profile["dhcpv6_snooping"], "flowcontrol": port_profile["flow_control"], "arp_trust": not port_profile["arp_protect"], } if port_profile["radius_type"] == "MAC-radius": mac_based_ports.append(port["port"]) iface["mac_based"] = True iface["addr_limit"] = port_profile["mac_limit"] iface["logoff"] = master_config.get("radius_logoff") interfaces.append(iface) loop_protect["ports"] = syntax_from_range(loop_protect["ports"]) arp_protect["vlans"] = syntax_from_range(arp_protect["vlans"], vlan_syntax=True) interfaces.sort(key=lambda x: x["number"]) radius_key = re2o_config.get("get_radius_key_value") radius_servers = [ {"ip": i, "secret": radius_key } for i in mgmt_utils["radius_servers"]["ipv4"] + mgmt_utils["radius_servers"]["ipv6"]] config_dict = { "header": header, "location": re2o_config.get("switchbay").get("name"), "hostname": re2o_config.get("short_name"), "dhcp_servers": mgmt_utils.get("dhcp_servers").get("ipv4"), "dhcpv6_servers": mgmt_utils.get("dhcp_servers").get("ipv6"), "ipv4_managers": ipv4_managers, "ipv6_managers": ipv6_managers, "vlans": vlans, "dhcp_snooping_vlans": syntax_from_range(dhcp_snooping_vlans, vlan_syntax=True), "dhcpv6_snooping_vlans": syntax_from_range(dhcpv6_snooping_vlans, vlan_syntax=True), "logging": mgmt_utils["log_servers"]["ipv4"] + mgmt_utils["log_servers"]["ipv6"], "sntp": mgmt_utils["ntp_servers"]["ipv4"] + mgmt_utils["ntp_servers"]["ipv6"], "dns": mgmt_utils["dns_recursive_servers"]["ipv4"] + mgmt_utils["dns_recursive_servers"]["ipv6"], "radius_servers": radius_servers, "snmp_user": master_config.get("snmp_user"), "unauth_redirect": master_config.get("unauth_redirect"), "mac_based_ports": syntax_from_range(mac_based_ports), "ra_guard_ports": syntax_from_range(ra_guard_ports), "loop_protect": loop_protect, "arp_protect": arp_protect, "interfaces": interfaces, } return conf_from_dict(config_dict) def connect_as_self(base_url, username=None): """Get a re2o token for the current unix user """ if username is None: username = getpass.getuser() r = requests.post( base_url + "token-auth", data={ "username": username, "password": getpass.getpass("Login to {} as {} with password :".format(base_url, username)) } ) if r.status_code != 200: logger.critical("Wrong login/password") exit(1) token = r.json().get("token") return token def get_switch_from_results(results, switch_name): for s in results: if s.get("short_name") == switch_name: return s return None def get_switch_from_re2o(re2o_instance, switch_name, re2o_user): base_url = "{}/api/".format(re2o_instance) token = connect_as_self(base_url, re2o_user) headers = {"Authorization": "Token " + token} # On récupère la config du bon switch r = requests.get(base_url + "switchs/ports-config", headers=headers) sw_config = get_switch_from_results(r.json()["results"], switch_name) while r.json().get("next") and s is None: r = requests.get(r.json().get("next"), headers=headers) sw_config = get_switch_from_results(r.json()["results"], switch_name) # on récupère les infos globales pour dhcp, managers etc. r = requests.get(base_url + "preferences/optionaltopologie", headers=headers) sw_config.update(r.json()) # on récupère la liste des vlans r = requests.get(base_url + "machines/vlan", headers=headers) sw_config.update({"vlans": r.json().get("results")}) return sw_config if __name__ == "__main__": format_string = "%(asctime)s - %(levelname)s - %(message)s" stdout_handler = logging.StreamHandler() stdout_formatter = colorlog.ColoredFormatter("%(log_color)s{}".format(format_string)) stdout_handler.setFormatter(stdout_formatter) logger.addHandler(stdout_handler) logger.setLevel(logging.DEBUG) parser = argparse.ArgumentParser(description="Script de déploiement de la configuration des switchs") parser.add_argument("-d", "--dry-run", action="store_true", help="Génère la nouvelle configuration et affiche le diff \ sans tenter de l'appliquer") parser.add_argument("-w", "--whole", action="store_true", help="Affiche la configuration en entier au lieu du diff") parser.add_argument("switch_name", type=str, help="Génère le template de ce switch") parser.add_argument("-H", "--host", type=str, required=False, help="Host sur lequel de déployer la configuration au lieu de l'adresse dans le template") parser.add_argument("-r", "--re2o", type=str, required=False, help="Si renseigné, la configuration sera récupérée depuis l'instance de re2o indiquée au lieu d'utiliser les fichiers yaml") parser.add_argument("--re2o-user", type=str, required=False, help="Permet de choisir l'user pour se connecter à l'api re2o, par défaut on prend l'user unix courant") parser.add_argument("-4", "--force-ipv4", action="store_true", help="Force le provisionning en ipv4") args = parser.parse_args() if args.re2o: logger.debug("Loading master config") master_config = yaml.load(open("configs/master.yml", "r"), yaml.Loader) logger.debug("Loading config from re2o") re2o_config = get_switch_from_re2o(args.re2o, args.switch_name, args.re2o_user) else: logger.debug("Loading master config") master_config = yaml.load(open("configs/master.yml", "r"), yaml.Loader) logger.debug("Loading config for {}".format(args.switch_name)) switch_config = yaml.load(open("configs/switches/{}.yml".format(args.switch_name), "r"), yaml.Loader) if args.host: switch_address = args.host elif args.re2o: if args.force_ipv4: switch_address = re2o_config.get("ipv4") else: switch_address = re2o_config.get("ipv6") or re2o_config.get("ipv4") else: if args.force_ipv4: switch_address = switch_config.get("ipv4-addr") else: switch_address = switch_config.get("ipv6-addr") or switch_config.get("ipv4-addr") if switch_address is None: switch_address = "{}.switches.crans.org".format(args.switch_name) #TODO: crans logger.info("Connecting to {} with address {}".format(args.switch_name, switch_address)) session = connect_to_switch(switch_address, user="root", key=master_config.get("ssh_private_key")) #TODO: spécifier chemin clef old_config = sftp_read_file(session, "cfg/running-config").decode("utf-8") header = get_header(old_config) # génération de la conf logging.info("Generating configuration for {}".format(args.switch_name)) if args.re2o: configuration = gen_conf_re2o(re2o_config, header) else: configuration = gen_conf(master_config, switch_config, header) # génération du diff for line in difflib.unified_diff(old_config.split("\n"), configuration.split("\n"), fromfile='origin', tofile='new', lineterm=""): if line.startswith("-"): termcolor.cprint(line, "red") elif line.startswith("+"): termcolor.cprint(line, "green") elif line.startswith("@"): termcolor.cprint(line, "yellow") else: print(line) if args.dry_run or input("Voulez-vous déployer la configuration sur le switch ? y/[n]").lower() not in ["o","y"]: logger.info("Aborting deployement") exit(0) with open("/tmp/conf_tmp", "w") as conf_tmp: conf_tmp.write(configuration) del session logging.info("Uploading configuration for {}".format(args.switch_name)) session = connect_to_switch(switch_address, user="crans", key=master_config.get("ssh_private_key")) sftp_write_file(session, "/tmp/conf_tmp", "cfg/startup-config") logging.info("Deployement done for {}".format(args.switch_name)) # on retire ces deux lignes qui sont hardcodées dans les templates crans # c'est deux adresses de multicast au pif, wtf ? # https://gitlab.crans.org/nounous/scripts/-/commit/af2d158ff1a99dee2f032a81bf14de4144d2b82f #-filter multicast 01005e-0000fb drop 1-52 #-filter multicast 333300-0000fb drop 1-52