From 344929a9ca57b5c8cf557ae43648c4374d7f8ff5 Mon Sep 17 00:00:00 2001 From: Jouni Malinen Date: Thu, 27 Dec 2018 12:25:37 +0200 Subject: [PATCH] tests: MACsec PSK local failures in CP state machine Signed-off-by: Jouni Malinen --- tests/hwsim/test_macsec.py | 95 ++++++++++++++++++++++++++++---------- 1 file changed, 71 insertions(+), 24 deletions(-) diff --git a/tests/hwsim/test_macsec.py b/tests/hwsim/test_macsec.py index f1d6d2284..ce5ee9f28 100644 --- a/tests/hwsim/test_macsec.py +++ b/tests/hwsim/test_macsec.py @@ -13,7 +13,7 @@ import time from wpasupplicant import WpaSupplicant import hwsim_utils -from utils import HwsimSkip +from utils import HwsimSkip, alloc_fail, fail_test, wait_fail_trigger def cleanup_macsec(): wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5') @@ -150,15 +150,50 @@ def log_ip_link(): cmd.stdout.close() logger.info("ip link:\n" + res) -def run_macsec_psk(dev, apdev, params, prefix, integ_only=False, port0=None, - port1=None, ckn0=None, ckn1=None, cak0=None, cak1=None, - expect_failure=False): +def add_veth(): try: subprocess.check_call([ "ip", "link", "add", "veth0", "type", "veth", "peer", "name", "veth1" ]) except subprocess.CalledProcessError: raise HwsimSkip("veth not supported (kernel CONFIG_VETH)") +def add_wpas_interfaces(count=2): + wpa = [] + try: + for i in range(count): + wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5') + wpas.interface_add("veth%d" % i, driver="macsec_linux") + wpa.append(wpas) + except Exception, e: + if "Failed to add a dynamic wpa_supplicant interface" in str(e): + raise HwsimSkip("macsec supported (wpa_supplicant CONFIG_MACSEC, CONFIG_MACSEC_LINUX; kernel CONFIG_MACSEC)") + raise + + return wpa + +def wait_key_distribution(wpas0, wpas1, expect_failure=False): + max_iter = 14 if expect_failure else 40 + for i in range(max_iter): + key_tx0 = int(wpas0.get_status_field("Number of Keys Distributed")) + key_rx0 = int(wpas0.get_status_field("Number of Keys Received")) + key_tx1 = int(wpas1.get_status_field("Number of Keys Distributed")) + key_rx1 = int(wpas1.get_status_field("Number of Keys Received")) + if (key_tx0 > 0 or key_rx0 > 0) and (key_tx1 > 0 or key_rx1 > 0): + return + time.sleep(0.5) + + if expect_failure: + if key_tx0 != 0 or key_rx0 != 0 or key_tx1 != 0 or key_rx1 != 0: + raise Exception("Unexpected key distribution") + return + + raise Exception("No key distribution seen") + +def run_macsec_psk(dev, apdev, params, prefix, integ_only=False, port0=None, + port1=None, ckn0=None, ckn1=None, cak0=None, cak1=None, + expect_failure=False): + add_veth() + cap_veth0 = os.path.join(params['logdir'], prefix + ".veth0.pcap") cap_veth1 = os.path.join(params['logdir'], prefix + ".veth1.pcap") cap_macsec0 = os.path.join(params['logdir'], prefix + ".macsec0.pcap") @@ -178,16 +213,9 @@ def run_macsec_psk(dev, apdev, params, prefix, integ_only=False, port0=None, '--immediate-mode'], stderr=open('/dev/null', 'w')) - wpas0 = WpaSupplicant(global_iface='/tmp/wpas-wlan5') - try: - wpas0.interface_add("veth0", driver="macsec_linux") - except Exception, e: - if "Failed to add a dynamic wpa_supplicant interface" in str(e): - raise HwsimSkip("macsec supported (wpa_supplicant CONFIG_MACSEC, CONFIG_MACSEC_LINUX; kernel CONFIG_MACSEC)") - raise - - wpas1 = WpaSupplicant(global_iface='/tmp/wpas-wlan5') - wpas1.interface_add("veth1", driver="macsec_linux") + wpa = add_wpas_interfaces() + wpas0 = wpa[0] + wpas1 = wpa[1] set_mka_psk_config(wpas0, integ_only=integ_only, port=port0, ckn=ckn0, cak=cak0) @@ -204,18 +232,9 @@ def run_macsec_psk(dev, apdev, params, prefix, integ_only=False, port0=None, macsec_ifname0 = wpas0.get_driver_status_field("parent_ifname") macsec_ifname1 = wpas1.get_driver_status_field("parent_ifname") - for i in range(10): - key_tx0 = int(wpas0.get_status_field("Number of Keys Distributed")) - key_rx0 = int(wpas0.get_status_field("Number of Keys Received")) - key_tx1 = int(wpas1.get_status_field("Number of Keys Distributed")) - key_rx1 = int(wpas1.get_status_field("Number of Keys Received")) - if key_rx0 > 0 and key_tx1 > 0: - break - time.sleep(1) + wait_key_distribution(wpas0, wpas1, expect_failure=expect_failure) if expect_failure: - if key_tx0 != 0 or key_rx0 != 0 or key_tx1 != 0 or key_rx1 != 0: - raise Exception("Unexpected key distribution") for i in range(len(cmd)): cmd[i].terminate() return @@ -470,3 +489,31 @@ def run_macsec_psk_ns(dev, apdev, params): time.sleep(1) for i in range(len(cmd)): cmd[i].terminate() + +def test_macsec_psk_fail_cp(dev, apdev): + """MACsec PSK local failures in CP state machine""" + try: + add_veth() + wpa = add_wpas_interfaces() + set_mka_psk_config(wpa[0]) + with alloc_fail(wpa[0], 1, "sm_CP_RECEIVE_Enter"): + set_mka_psk_config(wpa[1]) + wait_fail_trigger(wpa[0], "GET_ALLOC_FAIL", max_iter=100) + + wait_key_distribution(wpa[0], wpa[1]) + finally: + cleanup_macsec() + +def test_macsec_psk_fail_cp2(dev, apdev): + """MACsec PSK local failures in CP state machine (2)""" + try: + add_veth() + wpa = add_wpas_interfaces() + set_mka_psk_config(wpa[0]) + with alloc_fail(wpa[1], 1, "ieee802_1x_cp_sm_init"): + set_mka_psk_config(wpa[1]) + wait_fail_trigger(wpa[1], "GET_ALLOC_FAIL", max_iter=100) + + wait_key_distribution(wpa[0], wpa[1]) + finally: + cleanup_macsec()