tests: MACsec PSK local failures in CP state machine

Signed-off-by: Jouni Malinen <j@w1.fi>
This commit is contained in:
Jouni Malinen 2018-12-27 12:25:37 +02:00
parent 1cb5082567
commit 344929a9ca

View file

@ -13,7 +13,7 @@ import time
from wpasupplicant import WpaSupplicant from wpasupplicant import WpaSupplicant
import hwsim_utils import hwsim_utils
from utils import HwsimSkip from utils import HwsimSkip, alloc_fail, fail_test, wait_fail_trigger
def cleanup_macsec(): def cleanup_macsec():
wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5') wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
@ -150,15 +150,50 @@ def log_ip_link():
cmd.stdout.close() cmd.stdout.close()
logger.info("ip link:\n" + res) logger.info("ip link:\n" + res)
def run_macsec_psk(dev, apdev, params, prefix, integ_only=False, port0=None, def add_veth():
port1=None, ckn0=None, ckn1=None, cak0=None, cak1=None,
expect_failure=False):
try: try:
subprocess.check_call([ "ip", "link", "add", "veth0", "type", "veth", subprocess.check_call([ "ip", "link", "add", "veth0", "type", "veth",
"peer", "name", "veth1" ]) "peer", "name", "veth1" ])
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
raise HwsimSkip("veth not supported (kernel CONFIG_VETH)") raise HwsimSkip("veth not supported (kernel CONFIG_VETH)")
def add_wpas_interfaces(count=2):
wpa = []
try:
for i in range(count):
wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
wpas.interface_add("veth%d" % i, driver="macsec_linux")
wpa.append(wpas)
except Exception, e:
if "Failed to add a dynamic wpa_supplicant interface" in str(e):
raise HwsimSkip("macsec supported (wpa_supplicant CONFIG_MACSEC, CONFIG_MACSEC_LINUX; kernel CONFIG_MACSEC)")
raise
return wpa
def wait_key_distribution(wpas0, wpas1, expect_failure=False):
max_iter = 14 if expect_failure else 40
for i in range(max_iter):
key_tx0 = int(wpas0.get_status_field("Number of Keys Distributed"))
key_rx0 = int(wpas0.get_status_field("Number of Keys Received"))
key_tx1 = int(wpas1.get_status_field("Number of Keys Distributed"))
key_rx1 = int(wpas1.get_status_field("Number of Keys Received"))
if (key_tx0 > 0 or key_rx0 > 0) and (key_tx1 > 0 or key_rx1 > 0):
return
time.sleep(0.5)
if expect_failure:
if key_tx0 != 0 or key_rx0 != 0 or key_tx1 != 0 or key_rx1 != 0:
raise Exception("Unexpected key distribution")
return
raise Exception("No key distribution seen")
def run_macsec_psk(dev, apdev, params, prefix, integ_only=False, port0=None,
port1=None, ckn0=None, ckn1=None, cak0=None, cak1=None,
expect_failure=False):
add_veth()
cap_veth0 = os.path.join(params['logdir'], prefix + ".veth0.pcap") cap_veth0 = os.path.join(params['logdir'], prefix + ".veth0.pcap")
cap_veth1 = os.path.join(params['logdir'], prefix + ".veth1.pcap") cap_veth1 = os.path.join(params['logdir'], prefix + ".veth1.pcap")
cap_macsec0 = os.path.join(params['logdir'], prefix + ".macsec0.pcap") cap_macsec0 = os.path.join(params['logdir'], prefix + ".macsec0.pcap")
@ -178,16 +213,9 @@ def run_macsec_psk(dev, apdev, params, prefix, integ_only=False, port0=None,
'--immediate-mode'], '--immediate-mode'],
stderr=open('/dev/null', 'w')) stderr=open('/dev/null', 'w'))
wpas0 = WpaSupplicant(global_iface='/tmp/wpas-wlan5') wpa = add_wpas_interfaces()
try: wpas0 = wpa[0]
wpas0.interface_add("veth0", driver="macsec_linux") wpas1 = wpa[1]
except Exception, e:
if "Failed to add a dynamic wpa_supplicant interface" in str(e):
raise HwsimSkip("macsec supported (wpa_supplicant CONFIG_MACSEC, CONFIG_MACSEC_LINUX; kernel CONFIG_MACSEC)")
raise
wpas1 = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
wpas1.interface_add("veth1", driver="macsec_linux")
set_mka_psk_config(wpas0, integ_only=integ_only, port=port0, ckn=ckn0, set_mka_psk_config(wpas0, integ_only=integ_only, port=port0, ckn=ckn0,
cak=cak0) cak=cak0)
@ -204,18 +232,9 @@ def run_macsec_psk(dev, apdev, params, prefix, integ_only=False, port0=None,
macsec_ifname0 = wpas0.get_driver_status_field("parent_ifname") macsec_ifname0 = wpas0.get_driver_status_field("parent_ifname")
macsec_ifname1 = wpas1.get_driver_status_field("parent_ifname") macsec_ifname1 = wpas1.get_driver_status_field("parent_ifname")
for i in range(10): wait_key_distribution(wpas0, wpas1, expect_failure=expect_failure)
key_tx0 = int(wpas0.get_status_field("Number of Keys Distributed"))
key_rx0 = int(wpas0.get_status_field("Number of Keys Received"))
key_tx1 = int(wpas1.get_status_field("Number of Keys Distributed"))
key_rx1 = int(wpas1.get_status_field("Number of Keys Received"))
if key_rx0 > 0 and key_tx1 > 0:
break
time.sleep(1)
if expect_failure: if expect_failure:
if key_tx0 != 0 or key_rx0 != 0 or key_tx1 != 0 or key_rx1 != 0:
raise Exception("Unexpected key distribution")
for i in range(len(cmd)): for i in range(len(cmd)):
cmd[i].terminate() cmd[i].terminate()
return return
@ -470,3 +489,31 @@ def run_macsec_psk_ns(dev, apdev, params):
time.sleep(1) time.sleep(1)
for i in range(len(cmd)): for i in range(len(cmd)):
cmd[i].terminate() cmd[i].terminate()
def test_macsec_psk_fail_cp(dev, apdev):
"""MACsec PSK local failures in CP state machine"""
try:
add_veth()
wpa = add_wpas_interfaces()
set_mka_psk_config(wpa[0])
with alloc_fail(wpa[0], 1, "sm_CP_RECEIVE_Enter"):
set_mka_psk_config(wpa[1])
wait_fail_trigger(wpa[0], "GET_ALLOC_FAIL", max_iter=100)
wait_key_distribution(wpa[0], wpa[1])
finally:
cleanup_macsec()
def test_macsec_psk_fail_cp2(dev, apdev):
"""MACsec PSK local failures in CP state machine (2)"""
try:
add_veth()
wpa = add_wpas_interfaces()
set_mka_psk_config(wpa[0])
with alloc_fail(wpa[1], 1, "ieee802_1x_cp_sm_init"):
set_mka_psk_config(wpa[1])
wait_fail_trigger(wpa[1], "GET_ALLOC_FAIL", max_iter=100)
wait_key_distribution(wpa[0], wpa[1])
finally:
cleanup_macsec()