tests: Add remote directory to tests

Add tests/remote directory and files:
config.py - handle devices/setup_params table
run-tests.py - run test cases
test_devices.py - run basic configuration tests

You can add own configuration file, by default this is cfg.py, and put
there devices and setup_params definition in format you can find in
config.py file. You can use -c option or just create cfg.py file.

Print available devices/test_cases:
./run-tests.py

Check devices (ssh connection, authorized_keys, interfaces):
./run-test.py -t devices

Run sanity tests (test_sanity_*):
./run-test.py -d <dut_name> -t sanity

Run all tests:
./run-tests.py -d <dut_name> -t all

Run test_A and test_B:
./run-tests.py -d <dut_name> -t "test_A, test_B"

Set reference device, and run sanity tests:
./run-tests.py -d <dut_name> -r <ref_name> -t sanity

Multiple duts/refs/monitors could be setup:
e.g.
./run-tests.py -d <dut_name> -r <ref1_name> -r <ref2_name> -t sanity

Monitor could be set like this:
./run-tests.py -d <dut_name> -t sanity -m all -m <standalone_monitor>

You can also add filters to tests you would like to run
./run-tests.py -d <dut_name> -t all -k wep -k g_only
./run-tests.py -d <dut_name> -t all -k VHT80

./run-test.py doesn't start/terminate wpa_supplicant or hostpad,
test cases are resposible for that, while we don't know test
case requirements.

Restart (-R) trace (-T) and perf (-P) options available.
This request trace/perf logs from the hosts (if possible).

As parameters each test case get:
- devices - table of available devices
- setup_params
- duts - names of DUTs should be tested
- refs - names of reference devices should be used
- monitors - names of monitors list

Each test could return append_text.

Signed-off-by: Janusz Dziedzic <janusz.dziedzic@tieto.com>
master
Janusz Dziedzic 8 years ago committed by Jouni Malinen
parent 6a003eb2a6
commit 5865186e31

@ -0,0 +1,81 @@
# Environment configuration
# Copyright (c) 2016, Tieto Corporation
#
# This software may be distributed under the terms of the BSD license.
# See README for more details.
#
# Currently static definition, in the future this could be a config file,
# or even common database with host management.
#
import logging
logger = logging.getLogger()
#
# You can put your settings in cfg.py file with setup_params, devices
# definitions in the format as below. In other case HWSIM cfg will be used.
#
setup_params = { "setup_hw" : "./tests/setup_hw.sh",
"hostapd" : "./tests/hostapd",
"wpa_supplicant" : "./tests/wpa_supplicant",
"iperf" : "iperf",
"country" : "US",
"log_dir" : "/tmp/",
"ipv4_test_net" : "192.168.12.0",
"trace_start" : "./tests/trace_start.sh",
"trace_stop" : "./tests/trace_stop.sh",
"perf_start" : "./tests/perf_start.sh",
"perf_stop" : "./tests/perf_stop.sh" }
#
#devices = [{"hostname": "192.168.254.58", "ifname" : "wlan0", "port": "9877", "name" : "t2-ath9k", "flags" : "AP_HT40 STA_HT40"},
# {"hostname": "192.168.254.58", "ifname" : "wlan1", "port": "9877", "name" : "t2-ath10k", "flags" : "AP_VHT80"},
# {"hostname": "192.168.254.58", "ifname" : "wlan3", "port": "9877", "name" : "t2-intel7260", "flags" : "STA_VHT80"},
# {"hostname": "192.168.254.50", "ifname" : "wlan0", "port": "9877", "name" : "t1-ath9k"},
# {"hostname": "192.168.254.50", "ifname" : "wlan1", "port": "9877", "name" : "t1-ath10k"}]
#
# HWSIM - ifaces available after modprobe mac80211_hwsim
#
devices = [{"hostname": "localhost", "ifname": "wlan0", "port": "9868", "name": "hwsim0", "flags": "AP_VHT80 STA_VHT80"},
{"hostname": "localhost", "ifname": "wlan1", "port": "9878", "name": "hwsim1", "flags": "AP_VHT80 STA_VHT80"},
{"hostname": "localhost", "ifname": "wlan2", "port": "9888", "name": "hwsim2", "flags": "AP_VHT80 STA_VHT80"},
{"hostname": "localhost", "ifname": "wlan3", "port": "9898", "name": "hwsim3", "flags": "AP_VHT80 STA_VHT80"}]
def get_setup_params(filename="cfg.py"):
try:
mod = __import__(filename.split(".")[0])
return mod.setup_params
except:
logger.debug("__import__(" + filename + ") failed, using static settings")
pass
return setup_params
def get_devices(filename="cfg.py"):
try:
mod = __import__(filename.split(".")[0])
return mod.devices
except:
logger.debug("__import__(" + filename + ") failed, using static settings")
pass
return devices
def get_device(devices, name=None, flags=None, lock=False):
if name is None and flags is None:
raise Exception("Failed to get device")
for device in devices:
if device['name'] == name:
return device
for device in devices:
try:
device_flags = device['flags']
if device_flags.find(flags) != -1:
return device
except:
pass
raise Exception("Failed to get device " + name)
def put_device(devices, name):
pass

@ -0,0 +1,288 @@
#!/usr/bin/env python2
#
# Remote test case executor
# Copyright (c) 2016, Tieto Corporation
#
# This software may be distributed under the terms of the BSD license.
# See README for more details.
import os
import re
import sys
import time
import traceback
import getopt
from datetime import datetime
import logging
logger = logging.getLogger()
scriptsdir = os.path.dirname(os.path.realpath(sys.modules[__name__].__file__))
sys.path.append(os.path.join(scriptsdir, '..', '..', 'wpaspy'))
sys.path.append(os.path.join(scriptsdir, '..', 'hwsim'))
import wpaspy
import config
from test_devices import show_devices
from test_devices import check_devices
def usage():
print "USAGE: " + sys.argv[0] + " -t devices"
print "USAGE: " + sys.argv[0] + " -t check_devices"
print "USAGE: " + sys.argv[0] + " -d <dut_name> -t <all|sanity|tests_to_run> [-r <ref_name>] [-c <cfg_file.py>] [-m <all|monitor_name>] [-R][-T][-P][-v]"
print "USAGE: " + sys.argv[0]
def get_devices(devices, duts, refs, monitors):
for dut in duts:
config.get_device(devices, dut, lock=True)
for ref in refs:
config.get_device(devices, ref, lock=True)
for monitor in monitors:
if monitor == "all":
continue
if monitor in duts:
continue
if monitor in refs:
continue
config.get_device(devices, monitor, lock=True)
def put_devices(devices, duts, refs, monitors):
for dut in duts:
config.put_device(devices, dut)
for ref in refs:
config.put_device(devices, ref)
for monitor in monitors:
if monitor == "all":
continue
if monitor in duts:
continue
if monitor in refs:
continue
config.put_device(devices, monitor)
def main():
duts = []
refs = []
monitors = []
filter_keys = []
requested_tests = ["help"]
cfg_file = "cfg.py"
log_dir = "./logs/"
verbose = False
trace = False
restart = False
perf = False
# parse input parameters
try:
opts, args = getopt.getopt(sys.argv[1:], "d:r:t:l:k:c:m:vRPT",
["dut=", "ref=", "tests=", "log-dir=",
"cfg=", "key=", "monitor="])
except getopt.GetoptError as err:
print(err)
usage()
sys.exit(2)
for option, argument in opts:
if option == "-v":
verbose = True
elif option == "-R":
restart = True
elif option == "-T":
trace = True
elif option == "-P":
perf = True
elif option in ("-d", "--dut"):
duts.append(argument)
elif option in ("-r", "--ref"):
refs.append(argument)
elif option in ("-t", "--tests"):
requested_tests = re.split('; | |, ', argument)
elif option in ("-l", "--log-dir"):
log_dir = argument
elif option in ("-k", "--key"):
filter_keys.append(argument)
elif option in ("-m", "--monitor"):
monitors.append(argument)
elif option in ("-c", "--cfg"):
cfg_file = argument
else:
assert False, "unhandled option"
# get env configuration
setup_params = config.get_setup_params(cfg_file)
devices = config.get_devices(cfg_file)
# put logs in log_dir
symlink = os.path.join(log_dir, "current");
if os.path.exists(symlink):
os.unlink(symlink)
log_dir = os.path.join(log_dir, time.strftime("%Y_%m_%d_%H_%M_%S"))
if not os.path.exists(log_dir):
os.makedirs(log_dir)
os.symlink(os.path.join("../", log_dir), symlink)
# setup restart/trace/perf request
setup_params['local_log_dir'] = log_dir
setup_params['restart_device'] = restart
setup_params['trace'] = trace
setup_params['perf'] = perf
# configure logger
logger.setLevel(logging.DEBUG)
stdout_handler = logging.StreamHandler()
stdout_handler.setLevel(logging.WARNING)
if verbose:
stdout_handler.setLevel(logging.DEBUG)
logger.addHandler(stdout_handler)
formatter = logging.Formatter('%(asctime)s - %(message)s')
file_name = os.path.join(log_dir, 'run-tests.log')
log_handler = logging.FileHandler(file_name)
log_handler.setLevel(logging.DEBUG)
log_handler.setFormatter(formatter)
logger.addHandler(log_handler)
# import available tests
tests = []
failed = []
test_modules = []
files = os.listdir(scriptsdir)
for t in files:
m = re.match(r'(test_.*)\.py$', t)
if m:
mod = __import__(m.group(1))
test_modules.append(mod.__name__.replace('test_', '', 1))
for key,val in mod.__dict__.iteritems():
if key.startswith("test_"):
tests.append(val)
test_names = list(set([t.__name__.replace('test_', '', 1) for t in tests]))
# sort the list
test_names.sort()
tests.sort()
# print help
if requested_tests[0] == "help":
usage()
print "\nAvailable Devices:"
for device in devices:
print "\t", device['name']
print "\nAvailable tests:"
for test in test_names:
print "\t", test
return
# show/check devices
if requested_tests[0] == "devices":
show_devices(devices, setup_params)
return
# apply filters
for filter_key in filter_keys:
filtered_tests = []
for test in tests:
if re.search(filter_key, test.__name__):
filtered_tests.append(test)
tests = filtered_tests
# setup test we should run
tests_to_run = []
if requested_tests[0] == "all":
tests_to_run = tests
elif requested_tests[0] == "sanity":
for test in tests:
if test.__name__.startswith("test_sanity_"):
tests_to_run.append(test)
else:
for test in requested_tests:
t = None
for tt in tests:
name = tt.__name__.replace('test_', '', 1)
if name == test:
t = tt
break
if not t:
logger.warning("test case: " + test + " NOT-FOUND")
continue
tests_to_run.append(t)
# lock devices
try:
get_devices(devices, duts, refs, monitors)
except Exception, e:
logger.warning("get devices failed: " + str(e))
logger.info(traceback.format_exc())
put_devices(devices, duts, refs, monitors)
return
except:
logger.warning("get devices failed")
logger.info(traceback.format_exc())
put_devices(devices, duts, refs, monitors)
return
# now run test cases
for dut in duts:
logger.warning("DUT: " + str(dut))
for ref in refs:
logger.warning("REF: " + str(ref))
for monitor in monitors:
logger.warning("MON: " + str(monitor))
# run check_devices at begining
logger.warning("RUN check_devices")
try:
check_devices(devices, setup_params, refs, duts, monitors)
except Exception, e:
logger.warning("FAILED: " + str(e))
logger.info(traceback.format_exc())
put_devices(devices, duts, refs, monitors)
return
except:
logger.warning("FAILED")
logger.info(traceback.format_exc())
put_devices(devices, duts, refs, monitors)
return
logger.warning("PASS")
test_no = 1
for test in tests_to_run:
try:
start = datetime.now()
setup_params['tc_name'] = test.__name__.replace('test_', '', 1)
logger.warning("START - " + setup_params['tc_name'] + " (" + str(test_no) + "/" + str(len(tests_to_run)) + ")")
if test.__doc__:
logger.info("Test: " + test.__doc__)
# run tc
res = test(devices, setup_params, refs, duts, monitors)
end = datetime.now()
logger.warning("PASS (" + res + ") - " + str((end - start).total_seconds()) + "s")
except KeyboardInterrupt:
put_devices(devices, duts, refs, monitors)
raise
except Exception, e:
end = datetime.now()
logger.warning("FAILED (" + str(e) + ") - " + str((end - start).total_seconds()) + "s")
logger.info(traceback.format_exc())
failed.append(test.__name__.replace('test_', '', 1))
except:
end = datetime.now()
logger.warning("FAILED - " + str((end - start).total_seconds()) + "s")
logger.info(traceback.format_exc())
failed.append(test.__name__.replace('test_', '', 1))
test_no += 1
# unlock devices
put_devices(devices, duts, refs, monitors)
if len(failed) > 0:
logger.warning("Failed test cases:")
for test in failed:
logger.warning("\t" + test)
if __name__ == "__main__":
main()

@ -0,0 +1,143 @@
#!/usr/bin/env python2
#
# Show/check devices
# Copyright (c) 2016, Tieto Corporation
#
# This software may be distributed under the terms of the BSD license.
# See README for more details.
import time
import traceback
import config
import os
import sys
import getopt
import re
import logging
logger = logging.getLogger()
import utils
from remotehost import Host
from wpasupplicant import WpaSupplicant
import hostapd
def show_devices(devices, setup_params):
"""Show/check available devices"""
print "Devices:"
for device in devices:
dev = config.get_device(devices, device['name'])
host = Host(host = dev['hostname'], ifname = dev['ifname'],
port = dev['port'], name = dev['name'])
# simple check if authorized_keys works correctly
status, buf = host.execute(["id"])
if status != 0:
print "[" + host.name + "] - ssh communication: FAILED"
continue
else:
print "[" + host.name + "] - ssh communication: OK"
# check setup_hw works correctly
try:
setup_hw = setup_params['setup_hw']
try:
restart_device = setup_params['restart_device']
except:
restart_device = "0"
host.execute([setup_hw, "-I", host.ifname, "-R", restart_device])
except:
pass
# show uname
status, buf = host.execute(["uname", "-s", "-n", "-r", "-m", "-o"])
print "\t" + buf
# show ifconfig
status, buf = host.execute(["ifconfig", host.ifname])
if status != 0:
print "\t" + host.ifname + " failed\n"
continue
lines = buf.splitlines()
for line in lines:
print "\t" + line
# check hostapd, wpa_supplicant, iperf exist
status, buf = host.execute([setup_params['wpa_supplicant'], "-v"])
if status != 0:
print "\t" + setup_params['wpa_supplicant'] + " not find\n"
continue
lines = buf.splitlines()
for line in lines:
print "\t" + line
print ""
status, buf = host.execute([setup_params['hostapd'], "-v"])
if status != 1:
print "\t" + setup_params['hostapd'] + " not find\n"
continue
lines = buf.splitlines()
for line in lines:
print "\t" + line
print ""
status, buf = host.execute([setup_params['iperf'], "-v"])
if status != 0 and status != 1:
print "\t" + setup_params['iperf'] + " not find\n"
continue
lines = buf.splitlines()
for line in lines:
print "\t" + line
print ""
def check_device(devices, setup_params, dev_name, monitor=False):
dev = config.get_device(devices, dev_name)
host = Host(host = dev['hostname'], ifname = dev['ifname'],
port = dev['port'], name = dev['name'])
# simple check if authorized_keys works correctly
status, buf = host.execute(["id"])
if status != 0:
raise Exception(dev_name + " - ssh communication FAILED: " + buf)
ifaces = re.split('; | |, ', host.ifname)
# try to setup host/ifaces
for iface in ifaces:
try:
setup_hw = setup_params['setup_hw']
try:
restart_device = setup_params['restart_device']
except:
restart_device = "0"
host.execute(setup_hw + " -I " + iface + " -R " + restart_device)
except:
pass
# check interfaces (multi for monitor)
for iface in ifaces:
status, buf = host.execute(["ifconfig", iface])
if status != 0:
raise Exception(dev_name + " ifconfig " + iface + " failed: " + buf)
# monitor doesn't need wpa_supplicant/hostapd ...
if monitor == True:
return
status, buf = host.execute(["ls", "-l", setup_params['wpa_supplicant']])
if status != 0:
raise Exception(dev_name + " - wpa_supplicant: " + buf)
status, buf = host.execute(["ls", "-l", setup_params['hostapd']])
if status != 0:
raise Exception(dev_name + " - hostapd: " + buf)
status, buf = host.execute(["which", setup_params['iperf']])
if status != 0:
raise Exception(dev_name + " - hostapd: " + buf)
status, buf = host.execute(["which", "tshark"])
if status != 0:
logger.debug(dev_name + " - hostapd: " + buf)
def check_devices(devices, setup_params, refs, duts, monitors):
"""Check duts/refs/monitors devices"""
for dut in duts:
check_device(devices, setup_params, dut)
for ref in refs:
check_device(devices, setup_params, ref)
for monitor in monitors:
if monitor == "all":
continue
check_device(devices, setup_params, monitor, monitor=True)
Loading…
Cancel
Save