# Test cases for MACsec/MKA # Copyright (c) 2018, Jouni Malinen # # This software may be distributed under the terms of the BSD license. # See README for more details. import logging logger = logging.getLogger() import os import signal import subprocess import time from wpasupplicant import WpaSupplicant import hwsim_utils from utils import HwsimSkip, alloc_fail, fail_test, wait_fail_trigger def cleanup_macsec(): wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5') wpas.interface_remove("veth0") wpas.interface_remove("veth1") subprocess.call(["ip", "link", "del", "veth0"], stderr=open('/dev/null', 'w')) def test_macsec_psk(dev, apdev, params): """MACsec PSK""" try: run_macsec_psk(dev, apdev, params, "macsec_psk") finally: cleanup_macsec() def test_macsec_psk_mka_life_time(dev, apdev, params): """MACsec PSK - MKA life time""" try: run_macsec_psk(dev, apdev, params, "macsec_psk_mka_life_time") wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5') wpas.interface_remove("veth1") # Wait for live peer to be removed on veth0 time.sleep(6.1) finally: cleanup_macsec() def test_macsec_psk_integ_only(dev, apdev, params): """MACsec PSK (integrity only)""" try: run_macsec_psk(dev, apdev, params, "macsec_psk_integ_only", integ_only=True) finally: cleanup_macsec() def test_macsec_psk_port(dev, apdev, params): """MACsec PSK (port)""" try: run_macsec_psk(dev, apdev, params, "macsec_psk_port", port0=65534, port1=65534) finally: cleanup_macsec() def test_macsec_psk_different_ports(dev, apdev, params): """MACsec PSK (different ports)""" try: run_macsec_psk(dev, apdev, params, "macsec_psk_different_ports", port0=2, port1=3) finally: cleanup_macsec() def test_macsec_psk_shorter_ckn(dev, apdev, params): """MACsec PSK (shorter CKN)""" try: ckn = "11223344" run_macsec_psk(dev, apdev, params, "macsec_psk_shorter_ckn", ckn0=ckn, ckn1=ckn) finally: cleanup_macsec() def test_macsec_psk_shorter_ckn2(dev, apdev, params): """MACsec PSK (shorter CKN, unaligned)""" try: ckn = "112233" run_macsec_psk(dev, apdev, params, "macsec_psk_shorter_ckn2", ckn0=ckn, ckn1=ckn) finally: cleanup_macsec() def test_macsec_psk_ckn_mismatch(dev, apdev, params): """MACsec PSK (CKN mismatch)""" try: ckn0 = "11223344" ckn1 = "1122334455667788" run_macsec_psk(dev, apdev, params, "macsec_psk_ckn_mismatch", ckn0=ckn0, ckn1=ckn1, expect_failure=True) finally: cleanup_macsec() def test_macsec_psk_cak_mismatch(dev, apdev, params): """MACsec PSK (CAK mismatch)""" try: cak0 = 16*"11" cak1 = 16*"22" run_macsec_psk(dev, apdev, params, "macsec_psk_cak_mismatch", cak0=cak0, cak1=cak1, expect_failure=True) finally: cleanup_macsec() def test_macsec_psk_256(dev, apdev, params): """MACsec PSK with 256-bit keys""" try: cak = "202122232425262728292a2b2c2d2e2f303132333435363738393a3b3c3d3e3f" run_macsec_psk(dev, apdev, params, "macsec_psk_256", cak0=cak, cak1=cak) finally: cleanup_macsec() def set_mka_psk_config(dev, mka_priority=None, integ_only=False, port=None, ckn=None, cak=None): dev.set("eapol_version", "3") dev.set("ap_scan", "0") dev.set("fast_reauth", "1") id = dev.add_network() dev.set_network(id, "key_mgmt", "NONE") if cak is None: cak = "000102030405060708090a0b0c0d0e0f" dev.set_network(id, "mka_cak", cak) if ckn is None: ckn = "000102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f" dev.set_network(id, "mka_ckn", ckn) dev.set_network(id, "eapol_flags", "0") dev.set_network(id, "macsec_policy", "1") if integ_only: dev.set_network(id, "macsec_integ_only", "1") if mka_priority is not None: dev.set_network(id, "mka_priority", str(mka_priority)) if port is not None: dev.set_network(id, "macsec_port", str(port)) dev.select_network(id) def log_ip_macsec(): cmd = subprocess.Popen([ "ip", "macsec", "show" ], stdout=subprocess.PIPE, stderr=open('/dev/null', 'w')) res = cmd.stdout.read() cmd.stdout.close() logger.info("ip macsec:\n" + res) def log_ip_link(): cmd = subprocess.Popen([ "ip", "link", "show" ], stdout=subprocess.PIPE) res = cmd.stdout.read() cmd.stdout.close() logger.info("ip link:\n" + res) def add_veth(): try: subprocess.check_call([ "ip", "link", "add", "veth0", "type", "veth", "peer", "name", "veth1" ]) except subprocess.CalledProcessError: raise HwsimSkip("veth not supported (kernel CONFIG_VETH)") def add_wpas_interfaces(count=2): wpa = [] try: for i in range(count): wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5') wpas.interface_add("veth%d" % i, driver="macsec_linux") wpa.append(wpas) except Exception, e: if "Failed to add a dynamic wpa_supplicant interface" in str(e): raise HwsimSkip("macsec supported (wpa_supplicant CONFIG_MACSEC, CONFIG_MACSEC_LINUX; kernel CONFIG_MACSEC)") raise return wpa def lower_addr(addr1, addr2): a1 = addr1.split(':') a2 = addr2.split(':') for i in range(6): if a1[i].decode("hex") < a2[i].decode("hex"): return True if a1[i].decode("hex") > a2[i].decode("hex"): return False return False def wait_mka_done(wpa, expect_failure=False): max_iter = 14 if expect_failure else 40 for i in range(max_iter): done = True for w in wpa: secured = w.get_status_field("Secured") peers = int(w.get_status_field("live_peers")) if expect_failure and (secured == "Yes" or peers > 0): raise Exception("MKA completed unexpectedly") if peers != len(wpa) - 1 or secured != "Yes": done = False break w.dump_monitor() if done: break time.sleep(0.5) if expect_failure: return if not done: raise Exception("MKA not completed successfully") key_server = None ks_prio = 999 for w in wpa: logger.info("%s STATUS:\n%s" % (w.ifname, w.request("STATUS"))) addr = w.get_status_field("address") prio = int(w.get_status_field("Actor Priority")) if key_server is None or prio < ks_prio or \ (prio == ks_prio and lower_addr(addr, ks_addr)): key_server = w ks_addr = addr ks_prio = prio logger.info("Expected key server: " + key_server.ifname) if key_server.get_status_field("is_key_server") != "Yes": raise Exception("Expected key server was not elected") for w in wpa: if w != key_server and w.get_status_field("is_key_server") == "Yes": raise Exception("Unexpected key server") def run_macsec_psk(dev, apdev, params, prefix, integ_only=False, port0=None, port1=None, ckn0=None, ckn1=None, cak0=None, cak1=None, expect_failure=False): add_veth() cap_veth0 = os.path.join(params['logdir'], prefix + ".veth0.pcap") cap_veth1 = os.path.join(params['logdir'], prefix + ".veth1.pcap") cap_macsec0 = os.path.join(params['logdir'], prefix + ".macsec0.pcap") cap_macsec1 = os.path.join(params['logdir'], prefix + ".macsec1.pcap") for i in range(2): subprocess.check_call([ "ip", "link", "set", "dev", "veth%d" % i, "up" ]) cmd = {} cmd[0] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'veth0', '-w', cap_veth0, '-s', '2000', '--immediate-mode'], stderr=open('/dev/null', 'w')) cmd[1] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'veth1', '-w', cap_veth1, '-s', '2000', '--immediate-mode'], stderr=open('/dev/null', 'w')) wpa = add_wpas_interfaces() wpas0 = wpa[0] wpas1 = wpa[1] set_mka_psk_config(wpas0, integ_only=integ_only, port=port0, ckn=ckn0, cak=cak0) set_mka_psk_config(wpas1, mka_priority=100, integ_only=integ_only, port=port1, ckn=ckn1, cak=cak1) log_ip_macsec() log_ip_link() logger.info("wpas0 STATUS:\n" + wpas0.request("STATUS")) logger.info("wpas1 STATUS:\n" + wpas1.request("STATUS")) logger.info("wpas0 STATUS-DRIVER:\n" + wpas0.request("STATUS-DRIVER")) logger.info("wpas1 STATUS-DRIVER:\n" + wpas1.request("STATUS-DRIVER")) macsec_ifname0 = wpas0.get_driver_status_field("parent_ifname") macsec_ifname1 = wpas1.get_driver_status_field("parent_ifname") wait_mka_done(wpa, expect_failure=expect_failure) if expect_failure: for i in range(len(cmd)): cmd[i].terminate() return cmd[2] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', macsec_ifname0, '-w', cap_macsec0, '-s', '2000', '--immediate-mode'], stderr=open('/dev/null', 'w')) cmd[3] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', macsec_ifname1, '-w', cap_macsec1, '-s', '2000', '--immediate-mode'], stderr=open('/dev/null', 'w')) time.sleep(0.5) mi0 = wpas0.get_status_field("mi") mi1 = wpas1.get_status_field("mi") sci0 = wpas0.get_status_field("actor_sci") sci1 = wpas1.get_status_field("actor_sci") logger.info("wpas0 MIB:\n" + wpas0.request("MIB")) logger.info("wpas1 MIB:\n" + wpas1.request("MIB")) mib0 = wpas0.get_mib() mib1 = wpas1.get_mib() if mib0['ieee8021XKayMkaPeerListMI'] != mi1: raise Exception("Unexpected ieee8021XKayMkaPeerListMI value (0)") if mib0['ieee8021XKayMkaPeerListType'] != "1": raise Exception("Unexpected ieee8021XKayMkaPeerListType value (0)") if mib0['ieee8021XKayMkaPeerListSCI'] != sci1: raise Exception("Unexpected ieee8021XKayMkaPeerListSCI value (0)") if mib1['ieee8021XKayMkaPeerListMI'] != mi0: raise Exception("Unexpected ieee8021XKayMkaPeerListMI value (1)") if mib1['ieee8021XKayMkaPeerListType'] != "1": raise Exception("Unexpected ieee8021XKayMkaPeerListType value (1)") if mib1['ieee8021XKayMkaPeerListSCI'] != sci0: raise Exception("Unexpected ieee8021XKayMkaPeerListSCI value (1)") logger.info("wpas0 STATUS:\n" + wpas0.request("STATUS")) logger.info("wpas1 STATUS:\n" + wpas1.request("STATUS")) log_ip_macsec() hwsim_utils.test_connectivity(wpas0, wpas1, ifname1=macsec_ifname0, ifname2=macsec_ifname1, send_len=1400) log_ip_macsec() time.sleep(1) for i in range(len(cmd)): cmd[i].terminate() def cleanup_macsec_br(count): wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5') for i in range(count): wpas.interface_remove("veth%d" % i) subprocess.call(["ip", "link", "del", "veth%d" % i], stderr=open('/dev/null', 'w')) subprocess.call(["ip", "link", "set", "brveth", "down"]) subprocess.call(["brctl", "delbr", "brveth"]) def test_macsec_psk_br2(dev, apdev): """MACsec PSK (bridge; 2 devices)""" try: run_macsec_psk_br(dev, apdev, 2, [10, 20]) finally: cleanup_macsec_br(count=2) def test_macsec_psk_br2_same_prio(dev, apdev): """MACsec PSK (bridge; 2 devices, same mka_priority)""" try: run_macsec_psk_br(dev, apdev, 2, [None, None]) finally: cleanup_macsec_br(count=2) def test_macsec_psk_br3(dev, apdev): """MACsec PSK (bridge; 3 devices)""" try: run_macsec_psk_br(dev, apdev, 3, [10, 20, 30]) finally: cleanup_macsec_br(count=3) def test_macsec_psk_br3_same_prio(dev, apdev): """MACsec PSK (bridge; 3 devices, same mka_priority)""" try: run_macsec_psk_br(dev, apdev, 3, [None, None, None]) finally: cleanup_macsec_br(count=3) def run_macsec_psk_br(dev, apdev, count, mka_priority): subprocess.check_call(["brctl", "addbr", "brveth"]) subprocess.call(["echo 8 > /sys/devices/virtual/net/brveth/bridge/group_fwd_mask"], shell=True) try: for i in range(count): subprocess.check_call([ "ip", "link", "add", "veth%d" % i, "type", "veth", "peer", "name", "vethbr%d" % i ]) subprocess.check_call(["ip", "link", "set", "vethbr%d" % i, "up"]) subprocess.check_call([ "brctl", "addif", "brveth", "vethbr%d" % i ]) except subprocess.CalledProcessError: raise HwsimSkip("veth not supported (kernel CONFIG_VETH)") subprocess.check_call(["ip", "link", "set", "brveth", "up"]) log_ip_link() wpa = add_wpas_interfaces(count=count) for i in range(count): set_mka_psk_config(wpa[i], mka_priority=mka_priority[i]) wpa[i].dump_monitor() wait_mka_done(wpa) macsec_ifname = [] for i in range(count): macsec_ifname.append(wpa[i].get_driver_status_field("parent_ifname")) timeout = 2 max_tries = 2 if count > 2 else 1 success_seen = False failure_seen = False for i in range(1, count): try: hwsim_utils.test_connectivity(wpa[0], wpa[i], ifname1=macsec_ifname[0], ifname2=macsec_ifname[i], send_len=1400, timeout=timeout, max_tries=max_tries) success_seen = True logger.info("Traffic test %d<->%d success" % (0, i)) except: failure_seen = True logger.info("Traffic test %d<->%d failure" % (0, i)) for i in range(2, count): try: hwsim_utils.test_connectivity(wpa[1], wpa[i], ifname1=macsec_ifname[1], ifname2=macsec_ifname[i], send_len=1400, timeout=timeout, max_tries=max_tries) success_seen = True logger.info("Traffic test %d<->%d success" % (1, i)) except: failure_seen = True logger.info("Traffic test %d<->%d failure" % (1, i)) if not success_seen: raise Exception("None of the data traffic tests succeeded") # Something seems to be failing with three device tests semi-regularly, so # do not report this as a failed test case until the real reason behind # those failures have been determined. if failure_seen: if count < 3: raise Exception("Data traffic test failed") else: logger.info("Data traffic test failed - ignore for now for >= 3 device cases") def test_macsec_psk_ns(dev, apdev, params): """MACsec PSK (netns)""" try: run_macsec_psk_ns(dev, apdev, params) finally: prefix = "macsec_psk_ns" pidfile = os.path.join(params['logdir'], prefix + ".pid") for i in range(2): was_running = False if os.path.exists(pidfile + str(i)): with open(pidfile + str(i), 'r') as f: pid = int(f.read().strip()) logger.info("wpa_supplicant for wpas%d still running with pid %d - kill it" % (i, pid)) was_running = True os.kill(pid, signal.SIGTERM) if was_running: time.sleep(1) subprocess.call(["ip", "netns", "exec", "ns0", "ip", "link", "del", "veth0"], stderr=open('/dev/null', 'w')) subprocess.call(["ip", "link", "del", "veth0"], stderr=open('/dev/null', 'w')) log_ip_link_ns() subprocess.call(["ip", "netns", "delete", "ns0"], stderr=open('/dev/null', 'w')) subprocess.call(["ip", "netns", "delete", "ns1"], stderr=open('/dev/null', 'w')) def log_ip_macsec_ns(): cmd = subprocess.Popen([ "ip", "macsec", "show" ], stdout=subprocess.PIPE, stderr=open('/dev/null', 'w')) res = cmd.stdout.read() cmd.stdout.close() logger.info("ip macsec show:\n" + res) cmd = subprocess.Popen([ "ip", "netns", "exec", "ns0", "ip", "macsec", "show" ], stdout=subprocess.PIPE, stderr=open('/dev/null', 'w')) res = cmd.stdout.read() cmd.stdout.close() logger.info("ip macsec show (ns0):\n" + res) cmd = subprocess.Popen([ "ip", "netns", "exec", "ns1", "ip", "macsec", "show" ], stdout=subprocess.PIPE, stderr=open('/dev/null', 'w')) res = cmd.stdout.read() cmd.stdout.close() logger.info("ip macsec show (ns1):\n" + res) def log_ip_link_ns(): cmd = subprocess.Popen([ "ip", "link", "show" ], stdout=subprocess.PIPE) res = cmd.stdout.read() cmd.stdout.close() logger.info("ip link:\n" + res) cmd = subprocess.Popen([ "ip", "netns", "exec", "ns0", "ip", "link", "show" ], stdout=subprocess.PIPE, stderr=open('/dev/null', 'w')) res = cmd.stdout.read() cmd.stdout.close() logger.info("ip link show (ns0):\n" + res) cmd = subprocess.Popen([ "ip", "netns", "exec", "ns1", "ip", "link", "show" ], stdout=subprocess.PIPE, stderr=open('/dev/null', 'w')) res = cmd.stdout.read() cmd.stdout.close() logger.info("ip link show (ns1):\n" + res) def write_conf(conffile, mka_priority=None): with open(conffile, 'w') as f: f.write("ctrl_interface=DIR=/var/run/wpa_supplicant\n") f.write("eapol_version=3\n") f.write("ap_scan=0\n") f.write("fast_reauth=1\n") f.write("network={\n") f.write(" key_mgmt=NONE\n") f.write(" mka_cak=000102030405060708090a0b0c0d0e0f\n") f.write(" mka_ckn=000102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f\n") if mka_priority is not None: f.write(" mka_priority=%d\n" % mka_priority) f.write(" eapol_flags=0\n") f.write(" macsec_policy=1\n") f.write("}\n") def run_macsec_psk_ns(dev, apdev, params): try: subprocess.check_call([ "ip", "link", "add", "veth0", "type", "veth", "peer", "name", "veth1" ]) except subprocess.CalledProcessError: raise HwsimSkip("veth not supported (kernel CONFIG_VETH)") prefix = "macsec_psk_ns" conffile = os.path.join(params['logdir'], prefix + ".conf") pidfile = os.path.join(params['logdir'], prefix + ".pid") logfile0 = os.path.join(params['logdir'], prefix + ".veth0.log") logfile1 = os.path.join(params['logdir'], prefix + ".veth1.log") cap_veth0 = os.path.join(params['logdir'], prefix + ".veth0.pcap") cap_veth1 = os.path.join(params['logdir'], prefix + ".veth1.pcap") cap_macsec0 = os.path.join(params['logdir'], prefix + ".macsec0.pcap") cap_macsec1 = os.path.join(params['logdir'], prefix + ".macsec1.pcap") for i in range(2): try: subprocess.check_call([ "ip", "netns", "add", "ns%d" % i ]) except subprocess.CalledProcessError: raise HwsimSkip("network namespace not supported (kernel CONFIG_NAMESPACES, CONFIG_NET_NS)") subprocess.check_call([ "ip", "link", "set", "veth%d" % i, "netns", "ns%d" %i ]) subprocess.check_call([ "ip", "netns", "exec", "ns%d" % i, "ip", "link", "set", "dev", "veth%d" % i, "up" ]) cmd = {} cmd[0] = subprocess.Popen(['ip', 'netns', 'exec', 'ns0', 'tcpdump', '-p', '-U', '-i', 'veth0', '-w', cap_veth0, '-s', '2000', '--immediate-mode'], stderr=open('/dev/null', 'w')) cmd[1] = subprocess.Popen(['ip', 'netns', 'exec', 'ns1', 'tcpdump', '-p', '-U', '-i', 'veth1', '-w', cap_veth1, '-s', '2000', '--immediate-mode'], stderr=open('/dev/null', 'w')) write_conf(conffile + '0') write_conf(conffile + '1', mka_priority=100) prg = os.path.join(params['logdir'], 'alt-wpa_supplicant/wpa_supplicant/wpa_supplicant') if not os.path.exists(prg): prg = '../../wpa_supplicant/wpa_supplicant' arg = [ "ip", "netns", "exec", "ns0", prg, '-BdddtKW', '-P', pidfile + '0', '-f', logfile0, '-g', '/tmp/wpas-veth0', '-Dmacsec_linux', '-c', conffile + '0', '-i', "veth0" ] logger.info("Start wpa_supplicant: " + str(arg)) try: subprocess.check_call(arg) except subprocess.CalledProcessError: raise HwsimSkip("macsec supported (wpa_supplicant CONFIG_MACSEC, CONFIG_MACSEC_LINUX; kernel CONFIG_MACSEC)") if os.path.exists("wpa_supplicant-macsec2"): logger.info("Use alternative wpa_supplicant binary for one of the macsec devices") prg = "wpa_supplicant-macsec2" arg = [ "ip", "netns", "exec", "ns1", prg, '-BdddtKW', '-P', pidfile + '1', '-f', logfile1, '-g', '/tmp/wpas-veth1', '-Dmacsec_linux', '-c', conffile + '1', '-i', "veth1" ] logger.info("Start wpa_supplicant: " + str(arg)) subprocess.check_call(arg) wpas0 = WpaSupplicant('veth0', '/tmp/wpas-veth0') wpas1 = WpaSupplicant('veth1', '/tmp/wpas-veth1') log_ip_macsec_ns() log_ip_link_ns() logger.info("wpas0 STATUS:\n" + wpas0.request("STATUS")) logger.info("wpas1 STATUS:\n" + wpas1.request("STATUS")) logger.info("wpas0 STATUS-DRIVER:\n" + wpas0.request("STATUS-DRIVER")) logger.info("wpas1 STATUS-DRIVER:\n" + wpas1.request("STATUS-DRIVER")) macsec_ifname0 = wpas0.get_driver_status_field("parent_ifname") macsec_ifname1 = wpas1.get_driver_status_field("parent_ifname") for i in range(10): key_tx0 = int(wpas0.get_status_field("Number of Keys Distributed")) key_rx0 = int(wpas0.get_status_field("Number of Keys Received")) key_tx1 = int(wpas1.get_status_field("Number of Keys Distributed")) key_rx1 = int(wpas1.get_status_field("Number of Keys Received")) if key_rx0 > 0 and key_tx1 > 0: break time.sleep(1) cmd[2] = subprocess.Popen(['ip', 'netns', 'exec', 'ns0', 'tcpdump', '-p', '-U', '-i', macsec_ifname0, '-w', cap_macsec0, '-s', '2000', '--immediate-mode'], stderr=open('/dev/null', 'w')) cmd[3] = subprocess.Popen(['ip', 'netns', 'exec', 'ns0', 'tcpdump', '-p', '-U', '-i', macsec_ifname1, '-w', cap_macsec1, '-s', '2000', '--immediate-mode'], stderr=open('/dev/null', 'w')) time.sleep(0.5) logger.info("wpas0 STATUS:\n" + wpas0.request("STATUS")) logger.info("wpas1 STATUS:\n" + wpas1.request("STATUS")) log_ip_macsec_ns() hwsim_utils.test_connectivity(wpas0, wpas1, ifname1=macsec_ifname0, ifname2=macsec_ifname1, send_len=1400) log_ip_macsec_ns() subprocess.check_call(['ip', 'netns', 'exec', 'ns0', 'ip', 'addr', 'add', '192.168.248.17/30', 'dev', macsec_ifname0]) subprocess.check_call(['ip', 'netns', 'exec', 'ns1', 'ip', 'addr', 'add', '192.168.248.18/30', 'dev', macsec_ifname1]) c = subprocess.Popen(['ip', 'netns', 'exec', 'ns0', 'ping', '-c', '2', '192.168.248.18'], stdout=subprocess.PIPE) res = c.stdout.read() c.stdout.close() logger.info("ping:\n" + res) if "2 packets transmitted, 2 received" not in res: raise Exception("ping did not work") wpas0.request("TERMINATE") del wpas0 wpas1.request("TERMINATE") del wpas1 time.sleep(1) for i in range(len(cmd)): cmd[i].terminate() def test_macsec_psk_fail_cp(dev, apdev): """MACsec PSK local failures in CP state machine""" try: add_veth() wpa = add_wpas_interfaces() set_mka_psk_config(wpa[0]) with alloc_fail(wpa[0], 1, "sm_CP_RECEIVE_Enter"): set_mka_psk_config(wpa[1]) wait_fail_trigger(wpa[0], "GET_ALLOC_FAIL", max_iter=100) wait_mka_done(wpa) finally: cleanup_macsec() def test_macsec_psk_fail_cp2(dev, apdev): """MACsec PSK local failures in CP state machine (2)""" try: add_veth() wpa = add_wpas_interfaces() set_mka_psk_config(wpa[0]) with alloc_fail(wpa[1], 1, "ieee802_1x_cp_sm_init"): set_mka_psk_config(wpa[1]) wait_fail_trigger(wpa[1], "GET_ALLOC_FAIL", max_iter=100) wait_mka_done(wpa) finally: cleanup_macsec()