You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
switchs/main.py

327 lines
13 KiB
Python

#!/usr/bin/env python3
#coding:utf-8
from configparser import ConfigParser
import socket
from re2oapi import Re2oAPIClient
from jinja2 import Environment, FileSystemLoader
import requests
import base64
import json
import datetime
import sys
HELP_MESSAGE = """
Provisionning script from re2o.
python3 main.py [--help]
[--force] [--upgrade] [--limit <switchs>]
--help
Print this message
--force
Force rewrite the switch config. If not applied, the script only reconfigure the switchs that
re2o indicate as needing regeneration.
--upgrade
Upgrade the switch config
--limit <switchs>
Limit the scripts to the given list of switchs.
the list of switchs is a coma separated list of switch short name, whitout spaces.
exp: python3 --force --limit sw-ga-1,sw-ga-2
"""
# Yes, it's uggly, but we don't need to load all the mess bellow for just an help message.
if "--help" in sys.argv:
print(HELP_MESSAGE)
exit()
config = ConfigParser()
config.read('config.ini')
api_hostname = config.get('Re2o', 'hostname')
api_password = config.get('Re2o', 'password')
api_username = config.get('Re2o', 'username')
api_client = Re2oAPIClient(api_hostname, api_username, api_password, use_tls=True)
client_hostname = socket.gethostname().split('.', 1)[0]
all_switchs = api_client.list("switchs/ports-config/")
# Création de l'environnement Jinja
ENV = Environment(loader=FileSystemLoader('.'))
# Création du template final avec les valeurs contenues dans le dictionnaire "valeurs" - Ces valeurs sont positionnées dans un objet "temp", qui sera utilisé par le moteur, et que l'on retrouve dans le template.
class Switch:
def __init__(self):
self.additionnal = None
self.all_vlans = api_client.list("machines/vlan/")
self.settings = api_client.view("preferences/optionaltopologie/")
# Import du fichier template dans une variable "template"
self.hp_tpl = ENV.get_template("templates/hp.tpl")
self.conf = None
self.name = None
self.switch = None
self.headers = None
self.creds_dict = None
def get_conf_file_name(self):
return self.switch["short_name"] + ".conf"
def preprocess_hp(self):
"""Prérempli certains valeurs renvoyées directement à jinja, pour plus de simplicité"""
def add_to_vlans(vlans, vlan, port, tagged=True):
if not vlan['vlan_id'] in vlans:
if not tagged:
vlans[vlan['vlan_id']] = {'ports_untagged' : [str(port['port'])], 'ports_tagged' : [], 'name' : vlan['name']}
else:
vlans[vlan['vlan_id']] = {'ports_tagged' : [str(port['port'])], 'ports_untagged' : [], 'name' : vlan['name']}
else:
if not tagged:
vlans[vlan['vlan_id']]['ports_untagged'].append(str(port['port']))
else:
vlans[vlan['vlan_id']]['ports_tagged'].append(str(port['port']))
vlans = dict()
for port in self.switch['ports']:
if port['get_port_profile']['vlan_untagged']:
add_to_vlans(vlans, port['get_port_profile']['vlan_untagged'], port, tagged=False)
if port['get_port_profile']['vlan_tagged']:
for vlan in port['get_port_profile']['vlan_tagged']:
add_to_vlans(vlans, vlan, port)
#Trie les ip par vlan, et les ajoute ainsi que les subnet
for ip, subnet in self.switch["interfaces_subnet"].items():
if not subnet["vlan_id"] in vlans:
raise RuntimeError("La config est dangeureuse, le vlan d'administration n'est pas propagé au switch !")
vlans[subnet["vlan_id"]].setdefault("ipv4", {})
vlans[subnet["vlan_id"]]["ipv4"][ip] = subnet
for ipv6, subnet in self.switch["interfaces6_subnet"].items():
vlans[subnet["vlan_id"]].setdefault("ipv6", {})
vlans[subnet["vlan_id"]]["ipv6"][ipv6] = subnet
#Regroupement des options par vlans : dhcp_soop,arp, et dhcpv6, ainsi que igmp, mld , ra-guard et loop_protect
arp_protect_vlans = [vlan["vlan_id"] for vlan in self.all_vlans if vlan["arp_protect"]]
dhcp_snooping_vlans = [vlan["vlan_id"] for vlan in self.all_vlans if vlan["dhcp_snooping"]]
dhcpv6_snooping_vlans = [vlan["vlan_id"] for vlan in self.all_vlans if vlan["dhcpv6_snooping"]]
igmp_vlans = [vlan["vlan_id"] for vlan in self.all_vlans if vlan["igmp"]]
mld_vlans = [vlan["vlan_id"] for vlan in self.all_vlans if vlan["mld"]]
ra_guarded = [str(port['port']) for port in self.switch['ports'] if port['get_port_profile']['ra_guard']]
loop_protected = [str(port['port']) for port in self.switch['ports'] if port['get_port_profile']['loop_protect']]
self.additionals = {'ra_guarded' : ra_guarded, 'loop_protected' : loop_protected, 'vlans' : vlans, 'arp_protect_vlans' : arp_protect_vlans, 'dhcp_snooping_vlans' : dhcp_snooping_vlans, 'dhcpv6_snooping_vlans' : dhcpv6_snooping_vlans, 'igmp_vlans' : igmp_vlans, 'mld_vlans': mld_vlans}
def gen_conf_hp(self):
"""Génère la config pour ce switch hp"""
self.preprocess_hp()
self.conf = self.hp_tpl.render(switch=self.switch, settings=self.settings, additionals=self.additionals, date_gen=datetime.datetime.now())
def check_and_get_login(self):
"""Récupère les login/mdp du switch, renvoie false si ils sont indisponibles"""
self.creds_dict = self.switch["get_management_cred_value"]
if self.creds_dict:
return True
else:
return False
def login_hp(self):
"""Login into rest interface of this switch"""
url_login = "http://" + self.switch["ipv4"] + "/rest/v3/login-sessions"
payload_login = {
"userName" : self.creds_dict["id"],
"password" : self.creds_dict["pass"]
}
get_cookie = requests.post(url_login, data=json.dumps(payload_login))
cookie = get_cookie.json()['cookie']
self.headers = {"Cookie" : cookie}
def test(self):
url_info = "http://" + self.switch["ipv4"] + "/rest/v4/system/status/global_info"
get = requests.get(url_info, headers=self.headers)
def apply_conf_hp(self):
"""Apply config restore via rest"""
url_restore = "http://" + self.switch["ipv4"] + "/rest/v4/system/config/cfg_restore"
provision_mode = self.settings["switchs_provision"]
if provision_mode == "tftp":
data = {
"server_type": "ST_TFTP",
"file_name": self.get_conf_file_name(),
"tftp_server_address": {
"server_address": {
"ip_address": {
"version":"IAV_IP_V4",
"octets":self.settings["switchs_management_interface_ip"]}}},
"is_forced_reboot_enabled": True,
}
elif provision_mode == "sftp":
data = {
"server_type": "ST_SFTP",
"file_name": self.get_conf_file_name(),
"tftp_server_address": {
"server_address": {
"ip_address": {
"version":"IAV_IP_V4",
"octets":self.settings["switchs_management_interface_ip"]}},
"user_name": self.settings["switchs_management_sftp_creds"]["login"],
"password": self.settings["switchs_management_sftp_creds"]["pass"],
},
"is_forced_reboot_enabled": True,
}
# Nous lançons la requête de type POST.
post_restore = requests.post(url_restore, data=json.dumps(data), headers=self.headers)
def gen_conf_and_write(self):
"""Génère la conf suivant le bon constructeur et l'écrit"""
if self.switch["model"]:
constructor = self.switch["model"]["constructor"].lower()
try:
if "hp" in constructor or "aruba" in constructor:
self.gen_conf_hp()
self.write_conf()
except RuntimeError as e:
print("Il y a eu une erreur pour le switch %s, la config proposée n'est pas intègre" % self.switch["short_name"])
print(e)
def apply_conf(self):
if self.check_and_get_login():
if self.switch["model"] and self.switch["automatic_provision"] == True and self.settings["provision_switchs_enabled"]:
constructor = self.switch["model"]["constructor"].lower()
if "hp" in constructor or "aruba" in constructor:
self.login_hp()
self.apply_conf_hp()
def write_conf(self):
"""Ecriture de la conf du switch dans le fichier qui va bien"""
with open("generated/" + self.get_conf_file_name(), 'w+') as f:
f.write(self.conf)
def upgrade(self):
"""Met à jour le switch en fonction du constructeur"""
if self.switch["model"]:
constructor = self.switch["model"]["constructor"].lower()
try:
if "hp" in constructor or "aruba" in constructor:
self.get_firmware_hp()
if self.primary_firmware < self.switch["firmware"]:
self.upgrade_hp()
# self.reboot_hp()
except RuntimeError as e:
print("Il y a eu une erreur pour la mise à jour du switch" + self.switch["short_name"])
print(e)
def get_firmware_hp(self):
# L'URI est positionnée dans une variable
url_cli = "http://" + self.switch["ipv4"] + "/rest/v4/cli"
# On crée le dictionnaire avec la clé "cmd" et la valeur de la commande convertit en châine
data = {
"cmd": "show flash"
}
# On lance la requête avec le dictionnaire convertit en JSON et l'entête de session
post_cmd = requests.post(url_cli, data= json.dumps(data), headers=self.headers)
# On récupère le résultat de la requête, qui est décodé de base64, puis décodé en utf-8
result_decode = base64.b64decode(post_cmd.json()['result_base64_encoded']).decode('utf-8')
self.primary_firmware = result_decode.split('Secondary')[0].split()[-1]
self.secondary_firmware = result_decode.split('Boot')[0].split()[-1]
def upgrade_hp(self):
"""Update switch via rest"""
# On spécifie l'URI pour le transfert de fichier
url_file = "http://" + self.switch["ipv4"] + "/rest/v4/file-transfer"
# On crée le dictionnaire comportant les informations nécessaires (actions / URL / etc...)
data = {
"file_type":"FTT_FIRMWARE",
"url":"http://" + self.settings["switchs_management_interface_ip"] + "/" + self.switch["firmware"] + ".swi",
"action":"FTA_DOWNLOAD",
"boot_image": "BI_PRIMARY_IMAGE"
}
# On lance la requête au format POST
post_file = requests.post(url_file, data=json.dumps(data), headers=self.headers)
def reboot_hp(slef):
url_reboot = "http://" + self.switch["ipv4"] + "/rest/v4/system/reboot"
data = {
"action":"reload"
}
post_reboot = requests.post(url_reboot, data=json.dumps(data), headers=self.headers)
limit = False
if "--limit" in sys.argv:
limit = True
limit_list_index = sys.argv.index("--limit") + 1
if len(sys.argv) <= limit_list_index:
raise RuntimeError("--limit must be followeb by a comma separated list of switch")
limit_switchs = set(
filter(
bool,
map(
lambda x: x.strip(),
sys.argv[limit_list_index].split(',')
)
)
)
found_switchs = { x['short_name'] for x in all_switchs }
all_switchs = list(
filter(
lambda x: x['short_name'] in limit_switchs,
all_switchs
)
)
if not limit_switchs.issubset(found_switchs):
print("WARNING: some switchs in the list have not been found in the re2o API and will not be processed")
for sw_name in limit_switchs - found_switchs:
print("{name} has not been found".format(name=sw_name))
if "--force" in sys.argv:
sw = Switch()
for switch in all_switchs:
sw.switch = switch
sw.gen_conf_and_write()
try:
sw.apply_conf()
except Exception as e:
print("Erreur dans l'application de la conf pour " + switch["short_name"])
print(e)
if "--upgrade" in sys.argv:
sw = Switch()
for switch in all_switchs:
sw.switch = switch
sw.upgrade()
for service in api_client.list("services/regen/"):
if service['hostname'] == client_hostname and \
service['service_name'] == 'switchs' and \
service['need_regen']:
error = False
sw=Switch()
for switch in all_switchs:
sw.switch = switch
sw.gen_conf_and_write()
try:
sw.apply_conf()
except:
error = True
api_client.patch(service['api_url'], data={'need_regen': error})