Provisioning-switchs/core.py

91 lines
3 KiB
Python
Raw Permalink Normal View History

2020-08-14 13:34:51 +02:00
import logging
import os
import socket
from ssh2.session import Session
from ssh2.sftp import LIBSSH2_FXF_READ, LIBSSH2_SFTP_S_IRUSR, LIBSSH2_FXF_CREAT, LIBSSH2_FXF_WRITE, \
LIBSSH2_SFTP_S_IRGRP, LIBSSH2_SFTP_S_IWUSR, LIBSSH2_SFTP_S_IROTH
from ssh2.utils import wait_socket
logger = logging.getLogger()
def get_output(channel):
out = b""
size, data = channel.read(255)
while True:
out += data
if size < 255:
break
size, data = channel.read(255)
return out.decode()
def run_command(channel, command):
logger.debug("Running command {}".format(command))
channel.write(command + "\n")
get_output(channel) # random garbage after running a command.
return get_output(channel)
def sftp_read_file(session, path):
logger.debug("Reading file from {}".format(path))
sftp = session.sftp_init()
with sftp.open(path, LIBSSH2_FXF_READ, LIBSSH2_SFTP_S_IRUSR) as fh:
for size, data in fh:
pass
logger.debug("Done reading {}".format(path))
return data
def sftp_write_file(session, origin, destination):
logger.debug("Writing {} to {}".format(origin, destination))
sftp = session.sftp_init()
mode = LIBSSH2_SFTP_S_IRUSR | \
LIBSSH2_SFTP_S_IWUSR | \
LIBSSH2_SFTP_S_IRGRP | \
LIBSSH2_SFTP_S_IROTH
f_flags = LIBSSH2_FXF_CREAT | LIBSSH2_FXF_WRITE
with open(origin, 'rb') as local_fh, \
sftp.open(destination, f_flags, mode) as remote_fh:
for data in local_fh:
remote_fh.write(data)
logger.debug("Done writing {} to {}".format(origin, destination))
def connect_to_switch(address, port=22, user="root", password=None, key=None):
logger.debug("Connecting to {}:{} as {}".format(address, port, user))
family, _, _, _, sockaddr = socket.getaddrinfo(address, port)[0]
s = socket.socket(family, socket.SOCK_STREAM)
s.connect(sockaddr)
session = Session()
session.handshake(s)
2020-09-17 14:41:43 +02:00
# Authentication
2020-08-14 13:34:51 +02:00
if key is not None:
logger.debug("Connecting with private key {}".format(key))
if os.path.isfile(key):
session.userauth_publickey_fromfile(user, key)
elif password is not None:
session.userauth_password(user, password)
logger.debug("Connecting with password")
else:
logger.critical("Provide a password or a private key")
exit()
return session
def get_shell(session):
chan = session.open_session()
chan.shell()
get_output(chan)
get_output(chan)
chan.write("\n") # pass the press Enter part.
get_output(chan) # random stuff.
# Now we can start interacting with the chan
return chan
if __name__ == "__main__":
session = connect_to_switch("batb-5.switches.crans.org", key="/root/.ssh/id_rsa")
#sftp_write_file(session, "config.bak", "cfg/startup-config")
config = sftp_read_file(session, "cfg/running-config")
#with open("config.bak", "wb") as config_bak:
# config_bak.write(config)
print(config.decode("utf-8"))
#print(run_command(chan, "show run"))
#chan.close()