tests: Use function decorator to clean up --long processing

Signed-off-by: Jouni Malinen <j@w1.fi>
master
Jouni Malinen 4 years ago
parent b5bf18768f
commit 7e88ed8e2d

@ -187,6 +187,18 @@ def rename_log(logdir, basename, testname, dev):
logger.info("Failed to rename log files")
logger.info(e)
def is_long_duration_test(t):
return hasattr(t, "long_duration_test") and t.long_duration_test
def get_test_description(t):
if t.__doc__ is None:
desc = "MISSING DESCRIPTION"
else:
desc = t.__doc__
if is_long_duration_test(t):
desc += " [long]"
return desc
def main():
tests = []
test_modules = []
@ -319,13 +331,10 @@ def main():
if args.update_tests_db:
for t in tests_to_run:
name = t.__name__.replace('test_', '', 1)
if t.__doc__ is None:
print(name + " - MISSING DESCRIPTION")
else:
print(name + " - " + t.__doc__)
print(name + " - " + get_test_description(t))
if conn:
sql = 'INSERT OR REPLACE INTO tests(test,description) VALUES (?, ?)'
params = (name, t.__doc__)
params = (name, get_test_description(t))
try:
conn.execute(sql, params)
except Exception as e:
@ -512,10 +521,11 @@ def main():
sys.exit(1)
skip_reason = None
try:
if is_long_duration_test(t) and not args.long:
raise HwsimSkip("Skip test case with long duration due to --long not specified")
if t.__code__.co_argcount > 2:
params = {}
params['logdir'] = args.logdir
params['long'] = args.long
params['name'] = name
params['prefix'] = os.path.join(args.logdir, name)
t(dev, apdev, params)

@ -9,7 +9,7 @@ logger = logging.getLogger()
import time
import hostapd
from utils import skip_with_fips, alloc_fail, fail_test, HwsimSkip, clear_regdom
from utils import *
from test_ap_ht import clear_scan_cache
from test_dfs import wait_dfs_event
from test_sae import check_sae_capab
@ -390,10 +390,9 @@ def test_ap_acs_errors(dev, apdev):
if not ev:
raise Exception("ACS start timed out")
def test_ap_acs_dfs(dev, apdev, params):
"""Automatic channel selection, HT scan, and DFS [long]"""
if not params['long']:
raise HwsimSkip("Skip test case with long duration due to --long not specified")
@long_duration_test
def test_ap_acs_dfs(dev, apdev):
"""Automatic channel selection, HT scan, and DFS"""
try:
hapd = None
force_prev_ap_on_5g(apdev[0])
@ -472,10 +471,9 @@ def test_ap_acs_exclude_dfs(dev, apdev, params):
dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=0.5)
dev[0].flush_scan_cache()
def test_ap_acs_vht160_dfs(dev, apdev, params):
"""Automatic channel selection 160 MHz, HT scan, and DFS [long]"""
if not params['long']:
raise HwsimSkip("Skip test case with long duration due to --long not specified")
@long_duration_test
def test_ap_acs_vht160_dfs(dev, apdev):
"""Automatic channel selection 160 MHz, HT scan, and DFS"""
try:
hapd = None
force_prev_ap_on_5g(apdev[0])

@ -201,10 +201,9 @@ def test_ap_cipher_tkip_countermeasures_sta(dev, apdev):
if ev is not None:
raise Exception("Unexpected connection during TKIP countermeasures")
def test_ap_cipher_tkip_countermeasures_sta2(dev, apdev, params):
"""WPA-PSK/TKIP countermeasures (detected by two STAs) [long]"""
if not params['long']:
raise HwsimSkip("Skip test case with long duration due to --long not specified")
@long_duration_test
def test_ap_cipher_tkip_countermeasures_sta2(dev, apdev):
"""WPA-PSK/TKIP countermeasures (detected by two STAs)"""
skip_with_fips(dev[0])
skip_without_tkip(dev[0])
params = {"ssid": "tkip-countermeasures",

@ -13,7 +13,7 @@ import os
import hwsim_utils
import hostapd
from utils import alloc_fail, require_under_vm, get_phy
from utils import *
from test_ap_acs import force_prev_ap_on_24g
@remote_compatible

@ -3026,10 +3026,9 @@ def test_ap_wpa2_eap_eke(dev, apdev):
eap_connect(dev[0], hapd, "EKE", "eke user", password="hello1",
expect_failure=True)
def test_ap_wpa2_eap_eke_many(dev, apdev, params):
"""WPA2-Enterprise connection using EAP-EKE (many connections) [long]"""
if not params['long']:
raise HwsimSkip("Skip test case with long duration due to --long not specified")
@long_duration_test
def test_ap_wpa2_eap_eke_many(dev, apdev):
"""WPA2-Enterprise connection using EAP-EKE (many connections)"""
params = hostapd.wpa2_eap_params(ssid="test-wpa2-eap")
hostapd.add_ap(apdev[0], params)
success = 0

@ -2513,10 +2513,9 @@ def test_ap_wps_auto_setup_with_config_file(dev, apdev):
except:
pass
def test_ap_wps_pbc_timeout(dev, apdev, params):
"""wpa_supplicant PBC walk time and WPS ER SelReg timeout [long]"""
if not params['long']:
raise HwsimSkip("Skip test case with long duration due to --long not specified")
@long_duration_test
def test_ap_wps_pbc_timeout(dev, apdev):
"""wpa_supplicant PBC walk time and WPS ER SelReg timeout"""
ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
hapd = add_ssdp_ap(apdev[0], ap_uuid)
@ -10422,16 +10421,14 @@ def test_ap_wps_appl_ext(dev, apdev):
dev[0].request("WPS_PIN %s %s" % (apdev[0]['bssid'], pin))
dev[0].wait_connected(timeout=30)
def test_ap_wps_pbc_ap_timeout(dev, apdev, params):
"""WPS PBC timeout on AP [long]"""
if not params['long']:
raise HwsimSkip("Skip test case with long duration due to --long not specified")
@long_duration_test
def test_ap_wps_pbc_ap_timeout(dev, apdev):
"""WPS PBC timeout on AP"""
run_ap_wps_ap_timeout(dev, apdev, "WPS_PBC")
def test_ap_wps_pin_ap_timeout(dev, apdev, params):
"""WPS PIN timeout on AP [long]"""
if not params['long']:
raise HwsimSkip("Skip test case with long duration due to --long not specified")
@long_duration_test
def test_ap_wps_pin_ap_timeout(dev, apdev):
"""WPS PIN timeout on AP"""
run_ap_wps_ap_timeout(dev, apdev, "WPS_PIN any 12345670 10")
def run_ap_wps_ap_timeout(dev, apdev, cmd):

@ -135,10 +135,9 @@ def test_dfs(dev, apdev):
finally:
clear_regdom(hapd, dev)
def test_dfs_etsi(dev, apdev, params):
"""DFS and uniform spreading requirement for ETSI [long]"""
if not params['long']:
raise HwsimSkip("Skip test case with long duration due to --long not specified")
@long_duration_test
def test_dfs_etsi(dev, apdev):
"""DFS and uniform spreading requirement for ETSI"""
try:
hapd = None
hapd = start_dfs_ap(apdev[0])
@ -450,10 +449,9 @@ def test_dfs_radar_ht40minus(dev, apdev):
clear_regdom(hapd, dev)
dev[0].request("STA_AUTOCONNECT 1")
def test_dfs_ht40_minus(dev, apdev, params):
"""DFS CAC functionality on channel 104 HT40- [long]"""
if not params['long']:
raise HwsimSkip("Skip test case with long duration due to --long not specified")
@long_duration_test
def test_dfs_ht40_minus(dev, apdev):
"""DFS CAC functionality on channel 104 HT40-"""
try:
hapd = None
hapd = start_dfs_ap(apdev[0], ht40minus=True, channel=104)
@ -505,10 +503,9 @@ def test_dfs_cac_restart_on_enable(dev, apdev):
finally:
clear_regdom(hapd, dev)
def test_dfs_rrm(dev, apdev, params):
"""DFS with RRM [long]"""
if not params['long']:
raise HwsimSkip("Skip test case with long duration due to --long not specified")
@long_duration_test
def test_dfs_rrm(dev, apdev):
"""DFS with RRM"""
try:
hapd = None
hapd = start_dfs_ap(apdev[0], country="US", rrm_beacon_report=True)
@ -534,10 +531,9 @@ def test_dfs_rrm(dev, apdev, params):
finally:
clear_regdom(hapd, dev)
def test_dfs_radar_vht80_downgrade(dev, apdev, params):
"""DFS channel bandwidth downgrade from VHT80 to VHT40 [long]"""
if not params['long']:
raise HwsimSkip("Skip test case with long duration due to --long not specified")
@long_duration_test
def test_dfs_radar_vht80_downgrade(dev, apdev):
"""DFS channel bandwidth downgrade from VHT80 to VHT40"""
try:
# Start with 80 MHz channel 100 (5500 MHz) to find a radar
hapd = None
@ -609,10 +605,9 @@ def test_dfs_radar_vht80_downgrade(dev, apdev, params):
finally:
clear_regdom(hapd, dev)
def test_dfs_chan_switch(dev, apdev, params):
"""DFS channel switch [long]"""
if not params['long']:
raise HwsimSkip("Skip test case with long duration due to --long not specified")
@long_duration_test
def test_dfs_chan_switch(dev, apdev):
"""DFS channel switch"""
try:
hapd = None
hapd = start_dfs_ap(apdev[0], country="US")

@ -19,7 +19,7 @@ import time
import hostapd
import hwsim_utils
from hwsim import HWSimRadio
from utils import HwsimSkip, alloc_fail, fail_test, wait_fail_trigger
from utils import *
from wpasupplicant import WpaSupplicant
from wlantest import WlantestCapture
@ -5071,10 +5071,9 @@ def test_dpp_with_p2p_device(dev, apdev):
wait_auth_success(wpas, dev[0], configurator=dev[0], enrollee=wpas,
allow_enrollee_failure=True)
def test_dpp_chirp(dev, apdev, params):
"""DPP chirp [long]"""
if not params['long']:
raise HwsimSkip("Skip test case with long duration due to --long not specified")
@long_duration_test
def test_dpp_chirp(dev, apdev):
"""DPP chirp"""
check_dpp_capab(dev[0])
dev[0].flush_scan_cache()
@ -5114,10 +5113,9 @@ def test_dpp_chirp(dev, apdev, params):
if chan1 != 5 or chan6 != 5 or chan11 != 1:
raise Exception("Unexpected number of presence announcements sent: %d %d %d" % (chan1, chan6, chan11))
def test_dpp_chirp_listen(dev, apdev, params):
"""DPP chirp with listen [long]"""
if not params['long']:
raise HwsimSkip("Skip test case with long duration due to --long not specified")
@long_duration_test
def test_dpp_chirp_listen(dev, apdev):
"""DPP chirp with listen"""
check_dpp_capab(dev[0])
check_dpp_capab(dev[1])

@ -420,10 +420,9 @@ def test_he_40(devs, apdevs):
dev.request("DISCONNECT")
clear_regdom(hapd, devs)
def test_he160(dev, apdev, params):
"""HE with 160 MHz channel width (1) [long]"""
if not params['long']:
raise HwsimSkip("Skip test case with long duration due to --long not specified")
@long_duration_test
def test_he160(dev, apdev):
"""HE with 160 MHz channel width (1)"""
try:
hapd = None
params = {"ssid": "he",
@ -492,10 +491,9 @@ def test_he160(dev, apdev, params):
dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=0.5)
dev[0].flush_scan_cache()
def test_he160b(dev, apdev, params):
"""HE with 160 MHz channel width (2) [long]"""
if not params['long']:
raise HwsimSkip("Skip test case with long duration due to --long not specified")
@long_duration_test
def test_he160b(dev, apdev):
"""HE with 160 MHz channel width (2)"""
try:
hapd = None

@ -10,7 +10,7 @@ logger = logging.getLogger()
import time
import hostapd
from utils import HwsimSkip
from utils import *
def hostapd_oom_loop(apdev, params, start_func="main"):
hapd = hostapd.add_ap(apdev[0], {"ssid": "ctrl"})
@ -125,10 +125,9 @@ def test_hostapd_oom_wpa2_psk_connect(dev, apdev):
break
dev[0].request("SCAN_INTERVAL 5")
def test_hostapd_oom_wpa2_eap_connect(dev, apdev, params):
@long_duration_test
def test_hostapd_oom_wpa2_eap_connect(dev, apdev):
"""hostapd failing during WPA2-EAP mode connection due to OOM"""
if not params['long']:
raise HwsimSkip("Skip test case with long duration due to --long not specified")
params = hostapd.wpa2_eap_params(ssid="test-wpa2-eap")
params['acct_server_addr'] = "127.0.0.1"
params['acct_server_port'] = "1813"

@ -14,8 +14,7 @@ import os
import hostapd
import hwsim_utils
import utils
from utils import HwsimSkip
from utils import *
from wpasupplicant import WpaSupplicant
from p2p_utils import *
from test_p2p_messages import parse_p2p_public_action, p2p_hdr, p2p_attr_capability, p2p_attr_go_intent, p2p_attr_config_timeout, p2p_attr_listen_channel, p2p_attr_intended_interface_addr, p2p_attr_channel_list, p2p_attr_device_info, p2p_attr_operating_channel, ie_p2p, ie_wsc, mgmt_tx, P2P_GO_NEG_REQ
@ -46,7 +45,7 @@ def test_grpform_a(dev):
raise Exception("Unexpected group interface name")
check_grpform_results(i_res, r_res)
remove_group(dev[0], dev[1])
if i_res['ifname'] in utils.get_ifnames():
if i_res['ifname'] in get_ifnames():
raise Exception("Group interface netdev was not removed")
def test_grpform_b(dev):
@ -67,7 +66,7 @@ def test_grpform_b(dev):
if "FAIL" not in dev[0].group_request("P2P_GROUP_MEMBER 00:11:22:33:44:55"):
raise Exception("P2P_GROUP_MEMBER for non-member accepted")
remove_group(dev[0], dev[1])
if r_res['ifname'] in utils.get_ifnames():
if r_res['ifname'] in get_ifnames():
raise Exception("Group interface netdev was not removed")
def test_grpform_c(dev):
@ -82,9 +81,9 @@ def test_grpform_c(dev):
raise Exception("Unexpected group interface name")
check_grpform_results(i_res, r_res)
remove_group(dev[0], dev[1])
if i_res['ifname'] in utils.get_ifnames():
if i_res['ifname'] in get_ifnames():
raise Exception("Group interface netdev was not removed")
if r_res['ifname'] in utils.get_ifnames():
if r_res['ifname'] in get_ifnames():
raise Exception("Group interface netdev was not removed")
@remote_compatible
@ -99,9 +98,9 @@ def test_grpform2_c(dev):
dev[1].global_request("SET p2p_no_group_iface 0")
[i_res, r_res] = go_neg_pin_authorized(i_dev=dev[0], i_intent=0, r_dev=dev[1], r_intent=15)
remove_group(dev[0], dev[1])
if i_res['ifname'] in utils.get_ifnames():
if i_res['ifname'] in get_ifnames():
raise Exception("Group interface netdev was not removed")
if r_res['ifname'] in utils.get_ifnames():
if r_res['ifname'] in get_ifnames():
raise Exception("Group interface netdev was not removed")
@remote_compatible
@ -116,9 +115,9 @@ def test_grpform3_c(dev):
dev[1].global_request("SET p2p_no_group_iface 0")
[i_res, r_res] = go_neg_pin(i_dev=dev[0], i_intent=15, r_dev=dev[1], r_intent=0)
remove_group(dev[0], dev[1])
if i_res['ifname'] in utils.get_ifnames():
if i_res['ifname'] in get_ifnames():
raise Exception("Group interface netdev was not removed")
if r_res['ifname'] in utils.get_ifnames():
if r_res['ifname'] in get_ifnames():
raise Exception("Group interface netdev was not removed")
@remote_compatible
@ -734,11 +733,9 @@ def test_grpform_goneg_fail_with_group_iface(dev):
if ev is None:
raise Exception("GO Negotiation failure timed out")
def test_grpform_cred_ready_timeout(dev, apdev, params):
"""P2P GO Negotiation wait for credentials to become ready [long]"""
if not params['long']:
raise HwsimSkip("Skip test case with long duration due to --long not specified")
@long_duration_test
def test_grpform_cred_ready_timeout(dev):
"""P2P GO Negotiation wait for credentials to become ready"""
dev[1].p2p_listen()
addr1 = dev[1].p2p_dev_addr()
if not dev[0].discover_peer(addr1):
@ -1181,7 +1178,7 @@ def test_grpform_random_addr(dev):
check_grpform_results(i_res, r_res)
hwsim_utils.test_connectivity_p2p(dev[0], dev[1])
remove_group(dev[0], dev[1])
if i_res['ifname'] in utils.get_ifnames():
if i_res['ifname'] in get_ifnames():
raise Exception("Group interface netdev was not removed")
finally:
dev[0].global_request("SET p2p_interface_random_mac_addr 0")

@ -1294,10 +1294,9 @@ def test_wpas_mesh_password_mismatch(dev, apdev):
if count == 0:
raise Exception("Neither dev0 nor dev1 reported auth failure")
def test_wpas_mesh_password_mismatch_retry(dev, apdev, params):
"""Mesh password mismatch and retry [long]"""
if not params['long']:
raise HwsimSkip("Skip test case with long duration due to --long not specified")
@long_duration_test
def test_wpas_mesh_password_mismatch_retry(dev, apdev):
"""Mesh password mismatch and retry"""
check_mesh_support(dev[0], secure=True)
dev[0].request("SET sae_groups ")
id = add_mesh_secure_net(dev[0])

@ -30,6 +30,10 @@ class HwsimSkip(Exception):
def __str__(self):
return self.reason
def long_duration_test(func):
func.long_duration_test = True
return func
class alloc_fail(object):
def __init__(self, dev, count, funcs):
self._dev = dev

Loading…
Cancel
Save