ansible/library/switch_config.py

390 lines
12 KiB
Python

#!/usr/bin/python
DOCUMENTATION = """
---
module: Switch
short_description: Allow the setup of switches using rest API
description: Allow the setup of switches using rest API
options:
config:
description: configuration to send to the switch.
required: true
type: dict
host:
description: host of switch.
required: false
type: str
password:
description: password of the user.
required: true
type: str
port:
description: port of rest api.
required: false
type: int
use_proxy:
description:
Use a proxy to communicate with the switch.
HTTP_PROXY or ALL_PROXY must be set.
required: false
type: bool
username:
description: username of the rest API.
required: true
type: str
version:
description: version of the rest API.
required: false
type: str
"""
EXAMPLES = """
- name: Setup switch name
switch_config:
username: test
password: 1234
host: 192.168.1.1
port: 80
version: v8
config:
path: system
data:
name: "SwitchName"
- name: Setup vlans
switch_config:
username: test
password: 1234
host: 192.168.1.1
port: 80
config:
path: vlans
create_method: POST
subpath:
- path: 42
data:
name: "TheAnswer"
vlan_id: 42
status: VS_PORT_BASED
type: VT_STATIC
"""
from ansible.module_utils.basic import AnsibleModule
import json
import os
import requests
class SwitchApi:
def __init__(self, port, host, use_proxy, api="v1"):
self.headers = {'Content-Type': 'application/json'}
self.url_base = f"http://{host}:{port}/rest/{api}"
self.proxies = None
if use_proxy:
http_proxy = os.getenv("HTTP_PROXY")
all_proxy = os.getenv("ALL_PROXY")
if http_proxy != "":
self.proxies = {'http': http_proxy}
elif all_proxy != "":
self.proxies = {'http': all_proxy}
def login(self, username, password):
"""
Log in to the rest api.
Return True if the connection has succeeded and False otherwise.
"""
data = {"userName": username, "password": password}
response = self.post("/login-sessions", data = json.dumps(data),)
if response.status_code != 201:
return False
data = response.json()
if not 'cookie' in data:
return False
self.headers['cookie'] = data['cookie']
return True
def logout(self):
"""
Log out of the rest api.
Return True if connection has succeeded and False otherwise
"""
response = self.delete("/login-sessions")
if response.status_code != 204:
return False
self.headers.pop('cookie')
return True
def post(self, url, data = None):
kwargs = {
"headers": self.headers
}
if data is not None:
kwargs["data"] = data
if self.proxies is not None:
kwargs["proxies"] = self.proxies
return requests.post(self.url_base + url, **kwargs)
def get(self, url, data = ""):
kwargs = {
"headers": self.headers
}
if data is not None:
kwargs["data"] = data
if self.proxies is not None:
kwargs["proxies"] = self.proxies
return requests.get(self.url_base + url, **kwargs)
def delete(self, url, data = ""):
kwargs = {
"headers": self.headers
}
if data is not None:
kwargs["data"] = data
if self.proxies is not None:
kwargs["proxies"] = self.proxies
return requests.delete(self.url_base + url, **kwargs)
def put(self, url, data = ""):
kwargs = {
"headers": self.headers
}
if data is not None:
kwargs["data"] = data
if self.proxies is not None:
kwargs["proxies"] = self.proxies
return requests.put(self.url_base + url, **kwargs)
def required_modification(current_conf, modification):
for k, v in modification.items():
if not k in current_conf:
return True
if current_conf[k] != v:
return True
return False
def configure(module, config, api, current_path="", create_method=None):
path = "/" + str(config["path"])
url = current_path + path
check_mode = module.check_mode
changed = False
before = {"path": path}
after = {"path": path}
if not "path" in config:
api.logout()
raise Exception("A path must be specified.")
# If removing configuration
if "delete" in config and config["delete"]:
# Get the configuration
response = api.get(url)
if response.status_code == 404:
before["delete"] = True
elif response.status_code in (200, 201, 202, 203, 204):
before["data"] = response.json()
else:
api.logout()
raise Exception(
"Failed to check the old configuration:",
f"Url: {response.url}",
f"Status code: {response.status_code}",
f"Response: {response.text}",
)
# If required, delete
if not "delete" in before and not check_mode:
response = api.delete(url)
if response.status_code >= 400:
api.logout()
raise Exception(
"Failed to delete:",
f"Url: {response.url}",
f"Status code: {response.status_code}",
f"Response: {response.text}",
)
else:
# Verify that everything is ok
response = api.get(url)
if response.status_code == 404:
changed = True
after["delete"] = True
elif response.status_code in (200, 201, 202, 203, 204):
after["data"] = response.json()
after["delete"] = False
else:
api.logout()
raise Exception(
"Failed to check the configuration after delete:",
f"Url: {response.url}",
f"Status code: {response.status_code}",
f"Response: {response.text}",
)
elif not "delete" in before:
after["delete"] = True
changed = True
else:
after["delete"] = True
# If create or edit
elif "data" in config and type(config["data"]) is dict:
# Get the configuration
response = api.get(url)
new_data = {}
if response.status_code == 404:
before["delete"] = True
elif response.status_code in (200, 201, 202, 203, 204):
before["data"] = response.json()
new_data = before["data"].copy()
else:
api.logout()
raise Exception(
"Failed to check the old configuration:",
f"Url: {response.url}",
f"Status code: {response.status_code}",
f"Response: {response.text}",
)
# If required, modify
if "delete" in before and not check_mode:
# Create
if create_method == "POST":
response = api.post(current_path, json.dumps(config["data"]))
elif create_method == "PUT":
response = api.put(current_path, json.dumps(config["data"]))
else:
api.logout()
raise Exception(
"Failed to create:",
"Variable create_method must be set to PUT or POST",
)
if response.status_code >= 400:
api.logout()
raise Exception(
"Failed to create:",
f"Url: {response.url}",
f"Status code: {response.status_code}",
f"Data: {config["data"]}",
f"Response: {response.text}",
)
after["data"] = response.json()
changed = True
elif not check_mode:
# Edit
if required_modification(before["data"], config["data"]):
response = api.put(url, json.dumps(config["data"]))
if response.status_code >= 400:
api.logout()
raise Exception(
"Failed to edit:",
f"Url: {response.url}",
f"Status code : {response.status_code}",
f"Data: {config["data"]}",
f"Response: {response.text}",
)
changed = True
after["data"] = response.json()
else:
after["data"] = before["data"].copy()
else:
if "delete" in before and create_method is None:
api.logout()
raise Exception(
"Failed to create:",
"Variable create_method must be set to PUT or POST",
)
new_data.update(config["data"])
after["data"] = new_data
changed = changed or (not "data" in before) or after["data"] != before["data"]
# Configure the subpaths
if "subpath" in config and type(config["subpath"]) is list:
create_method = None
if "create_method" in config:
create_method = config["create_method"]
before["subpath"] = []
after["subpath"] = []
for subconf in config["subpath"]:
response = configure(
module, subconf,
api,
current_path=url,
create_method=create_method
)
changed = changed or response["changed"]
before["subpath"].append(response["diff"]["before"])
after["subpath"].append(response["diff"]["after"])
return {
"changed": changed,
"diff": {"after": after, "before": before}
}
def run_module():
module_args = dict(
config=dict(type='dict', required=True),
username=dict(type='str', required=True),
password=dict(type='str', required=True, no_log=True),
port=dict(type='int', required=True),
host=dict(type='str', required=True),
version=dict(type='str', required=False, default='v1'),
use_proxy=dict(type='bool', required=False, default=False),
)
module = AnsibleModule(
argument_spec=module_args,
supports_check_mode=True
)
result = {
"changed": False,
}
# api connection
api = SwitchApi(
module.params["port"],
module.params["host"],
module.params["use_proxy"],
api = module.params["version"]
)
login_success = api.login(
module.params["username"],
module.params["password"],
)
if not login_success:
module.fail_json(msg='login failed', **result)
return
try:
response = configure(module, module.params["config"], api)
except Exception as msg:
module.fail_json(msg="\n".join(msg.args), **result)
return
api.logout()
result.update(response)
if module.check_mode:
module.exit_json(**result)
module.exit_json(**result)
def main():
run_module()
if __name__ == '__main__':
main()