hostap/tests/hwsim/test_macsec.py
Jouni Malinen fe5400dda2 tests: Make MACsec test cases clear monitor socket more thoroughly
The wpas (dev5) control interface socket did not always get cleared in
the MACsec test cases and this could result in issues with following
test cases if the dev5 message queue hit the maximum limit.

Signed-off-by: Jouni Malinen <j@w1.fi>
2019-02-25 21:40:23 +02:00

690 lines
26 KiB
Python

# Test cases for MACsec/MKA
# Copyright (c) 2018, Jouni Malinen <j@w1.fi>
#
# This software may be distributed under the terms of the BSD license.
# See README for more details.
import logging
logger = logging.getLogger()
import binascii
import os
import signal
import subprocess
import time
from wpasupplicant import WpaSupplicant
import hwsim_utils
from utils import HwsimSkip, alloc_fail, fail_test, wait_fail_trigger
def cleanup_macsec():
wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
wpas.interface_remove("veth0")
wpas.interface_remove("veth1")
subprocess.call(["ip", "link", "del", "veth0"],
stderr=open('/dev/null', 'w'))
def test_macsec_psk(dev, apdev, params):
"""MACsec PSK"""
try:
run_macsec_psk(dev, apdev, params, "macsec_psk")
finally:
cleanup_macsec()
def test_macsec_psk_mka_life_time(dev, apdev, params):
"""MACsec PSK - MKA life time"""
try:
run_macsec_psk(dev, apdev, params, "macsec_psk_mka_life_time")
wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
wpas.interface_remove("veth1")
# Wait for live peer to be removed on veth0
time.sleep(6.1)
finally:
cleanup_macsec()
def test_macsec_psk_integ_only(dev, apdev, params):
"""MACsec PSK (integrity only)"""
try:
run_macsec_psk(dev, apdev, params, "macsec_psk_integ_only",
integ_only=True)
finally:
cleanup_macsec()
def test_macsec_psk_port(dev, apdev, params):
"""MACsec PSK (port)"""
try:
run_macsec_psk(dev, apdev, params, "macsec_psk_port",
port0=65534, port1=65534)
finally:
cleanup_macsec()
def test_macsec_psk_different_ports(dev, apdev, params):
"""MACsec PSK (different ports)"""
try:
run_macsec_psk(dev, apdev, params, "macsec_psk_different_ports",
port0=2, port1=3)
finally:
cleanup_macsec()
def test_macsec_psk_shorter_ckn(dev, apdev, params):
"""MACsec PSK (shorter CKN)"""
try:
ckn = "11223344"
run_macsec_psk(dev, apdev, params, "macsec_psk_shorter_ckn",
ckn0=ckn, ckn1=ckn)
finally:
cleanup_macsec()
def test_macsec_psk_shorter_ckn2(dev, apdev, params):
"""MACsec PSK (shorter CKN, unaligned)"""
try:
ckn = "112233"
run_macsec_psk(dev, apdev, params, "macsec_psk_shorter_ckn2",
ckn0=ckn, ckn1=ckn)
finally:
cleanup_macsec()
def test_macsec_psk_ckn_mismatch(dev, apdev, params):
"""MACsec PSK (CKN mismatch)"""
try:
ckn0 = "11223344"
ckn1 = "1122334455667788"
run_macsec_psk(dev, apdev, params, "macsec_psk_ckn_mismatch",
ckn0=ckn0, ckn1=ckn1, expect_failure=True)
finally:
cleanup_macsec()
def test_macsec_psk_cak_mismatch(dev, apdev, params):
"""MACsec PSK (CAK mismatch)"""
try:
cak0 = 16*"11"
cak1 = 16*"22"
run_macsec_psk(dev, apdev, params, "macsec_psk_cak_mismatch",
cak0=cak0, cak1=cak1, expect_failure=True)
finally:
cleanup_macsec()
def test_macsec_psk_256(dev, apdev, params):
"""MACsec PSK with 256-bit keys"""
try:
cak = "202122232425262728292a2b2c2d2e2f303132333435363738393a3b3c3d3e3f"
run_macsec_psk(dev, apdev, params, "macsec_psk_256", cak0=cak, cak1=cak)
finally:
cleanup_macsec()
def set_mka_psk_config(dev, mka_priority=None, integ_only=False, port=None,
ckn=None, cak=None):
dev.set("eapol_version", "3")
dev.set("ap_scan", "0")
dev.set("fast_reauth", "1")
id = dev.add_network()
dev.set_network(id, "key_mgmt", "NONE")
if cak is None:
cak = "000102030405060708090a0b0c0d0e0f"
dev.set_network(id, "mka_cak", cak)
if ckn is None:
ckn = "000102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f"
dev.set_network(id, "mka_ckn", ckn)
dev.set_network(id, "eapol_flags", "0")
dev.set_network(id, "macsec_policy", "1")
if integ_only:
dev.set_network(id, "macsec_integ_only", "1")
if mka_priority is not None:
dev.set_network(id, "mka_priority", str(mka_priority))
if port is not None:
dev.set_network(id, "macsec_port", str(port))
dev.select_network(id)
def log_ip_macsec():
cmd = subprocess.Popen([ "ip", "macsec", "show" ],
stdout=subprocess.PIPE,
stderr=open('/dev/null', 'w'))
res = cmd.stdout.read().decode()
cmd.stdout.close()
logger.info("ip macsec:\n" + res)
def log_ip_link():
cmd = subprocess.Popen([ "ip", "link", "show" ],
stdout=subprocess.PIPE)
res = cmd.stdout.read().decode()
cmd.stdout.close()
logger.info("ip link:\n" + res)
def add_veth():
try:
subprocess.check_call([ "ip", "link", "add", "veth0", "type", "veth",
"peer", "name", "veth1" ])
except subprocess.CalledProcessError:
raise HwsimSkip("veth not supported (kernel CONFIG_VETH)")
def add_wpas_interfaces(count=2):
wpa = []
try:
for i in range(count):
wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
wpas.interface_add("veth%d" % i, driver="macsec_linux")
wpa.append(wpas)
except Exception as e:
if "Failed to add a dynamic wpa_supplicant interface" in str(e):
raise HwsimSkip("macsec supported (wpa_supplicant CONFIG_MACSEC, CONFIG_DRIVER_MACSEC_LINUX; kernel CONFIG_MACSEC)")
raise
return wpa
def lower_addr(addr1, addr2):
a1 = addr1.split(':')
a2 = addr2.split(':')
for i in range(6):
if binascii.unhexlify(a1[i]) < binascii.unhexlify(a2[i]):
return True
if binascii.unhexlify(a1[i]) > binascii.unhexlify(a2[i]):
return False
return False
def wait_mka_done(wpa, expect_failure=False):
max_iter = 14 if expect_failure else 40
for i in range(max_iter):
done = True
for w in wpa:
secured = w.get_status_field("Secured")
peers = int(w.get_status_field("live_peers"))
if expect_failure and (secured == "Yes" or peers > 0):
raise Exception("MKA completed unexpectedly")
if peers != len(wpa) - 1 or secured != "Yes":
done = False
break
w.dump_monitor()
if done:
break
time.sleep(0.5)
if expect_failure:
return
if not done:
raise Exception("MKA not completed successfully")
key_server = None
ks_prio = 999
for w in wpa:
logger.info("%s STATUS:\n%s" % (w.ifname, w.request("STATUS")))
addr = w.get_status_field("address")
prio = int(w.get_status_field("Actor Priority"))
if key_server is None or prio < ks_prio or \
(prio == ks_prio and lower_addr(addr, ks_addr)):
key_server = w
ks_addr = addr
ks_prio = prio
logger.info("Expected key server: " + key_server.ifname)
if key_server.get_status_field("is_key_server") != "Yes":
raise Exception("Expected key server was not elected")
for w in wpa:
if w != key_server and w.get_status_field("is_key_server") == "Yes":
raise Exception("Unexpected key server")
def run_macsec_psk(dev, apdev, params, prefix, integ_only=False, port0=None,
port1=None, ckn0=None, ckn1=None, cak0=None, cak1=None,
expect_failure=False):
add_veth()
cap_veth0 = os.path.join(params['logdir'], prefix + ".veth0.pcap")
cap_veth1 = os.path.join(params['logdir'], prefix + ".veth1.pcap")
cap_macsec0 = os.path.join(params['logdir'], prefix + ".macsec0.pcap")
cap_macsec1 = os.path.join(params['logdir'], prefix + ".macsec1.pcap")
for i in range(2):
subprocess.check_call([ "ip", "link", "set", "dev", "veth%d" % i,
"up" ])
cmd = {}
cmd[0] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'veth0',
'-w', cap_veth0, '-s', '2000',
'--immediate-mode'],
stderr=open('/dev/null', 'w'))
cmd[1] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'veth1',
'-w', cap_veth1, '-s', '2000',
'--immediate-mode'],
stderr=open('/dev/null', 'w'))
wpa = add_wpas_interfaces()
wpas0 = wpa[0]
wpas1 = wpa[1]
set_mka_psk_config(wpas0, integ_only=integ_only, port=port0, ckn=ckn0,
cak=cak0)
set_mka_psk_config(wpas1, mka_priority=100, integ_only=integ_only,
port=port1, ckn=ckn1, cak=cak1)
log_ip_macsec()
log_ip_link()
logger.info("wpas0 STATUS:\n" + wpas0.request("STATUS"))
logger.info("wpas1 STATUS:\n" + wpas1.request("STATUS"))
logger.info("wpas0 STATUS-DRIVER:\n" + wpas0.request("STATUS-DRIVER"))
logger.info("wpas1 STATUS-DRIVER:\n" + wpas1.request("STATUS-DRIVER"))
macsec_ifname0 = wpas0.get_driver_status_field("parent_ifname")
macsec_ifname1 = wpas1.get_driver_status_field("parent_ifname")
wait_mka_done(wpa, expect_failure=expect_failure)
if expect_failure:
for i in range(len(cmd)):
cmd[i].terminate()
return
cmd[2] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', macsec_ifname0,
'-w', cap_macsec0, '-s', '2000',
'--immediate-mode'],
stderr=open('/dev/null', 'w'))
cmd[3] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', macsec_ifname1,
'-w', cap_macsec1, '-s', '2000',
'--immediate-mode'],
stderr=open('/dev/null', 'w'))
time.sleep(0.5)
mi0 = wpas0.get_status_field("mi")
mi1 = wpas1.get_status_field("mi")
sci0 = wpas0.get_status_field("actor_sci")
sci1 = wpas1.get_status_field("actor_sci")
logger.info("wpas0 MIB:\n" + wpas0.request("MIB"))
logger.info("wpas1 MIB:\n" + wpas1.request("MIB"))
mib0 = wpas0.get_mib()
mib1 = wpas1.get_mib()
if mib0['ieee8021XKayMkaPeerListMI'] != mi1:
raise Exception("Unexpected ieee8021XKayMkaPeerListMI value (0)")
if mib0['ieee8021XKayMkaPeerListType'] != "1":
raise Exception("Unexpected ieee8021XKayMkaPeerListType value (0)")
if mib0['ieee8021XKayMkaPeerListSCI'] != sci1:
raise Exception("Unexpected ieee8021XKayMkaPeerListSCI value (0)")
if mib1['ieee8021XKayMkaPeerListMI'] != mi0:
raise Exception("Unexpected ieee8021XKayMkaPeerListMI value (1)")
if mib1['ieee8021XKayMkaPeerListType'] != "1":
raise Exception("Unexpected ieee8021XKayMkaPeerListType value (1)")
if mib1['ieee8021XKayMkaPeerListSCI'] != sci0:
raise Exception("Unexpected ieee8021XKayMkaPeerListSCI value (1)")
logger.info("wpas0 STATUS:\n" + wpas0.request("STATUS"))
logger.info("wpas1 STATUS:\n" + wpas1.request("STATUS"))
log_ip_macsec()
hwsim_utils.test_connectivity(wpas0, wpas1,
ifname1=macsec_ifname0,
ifname2=macsec_ifname1,
send_len=1400)
log_ip_macsec()
time.sleep(1)
for i in range(len(cmd)):
cmd[i].terminate()
def cleanup_macsec_br(count):
wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
for i in range(count):
wpas.interface_remove("veth%d" % i)
subprocess.call(["ip", "link", "del", "veth%d" % i],
stderr=open('/dev/null', 'w'))
del wpas
subprocess.call(["ip", "link", "set", "brveth", "down"])
subprocess.call(["brctl", "delbr", "brveth"])
def test_macsec_psk_br2(dev, apdev):
"""MACsec PSK (bridge; 2 devices)"""
try:
run_macsec_psk_br(dev, apdev, 2, [10, 20])
finally:
cleanup_macsec_br(count=2)
def test_macsec_psk_br2_same_prio(dev, apdev):
"""MACsec PSK (bridge; 2 devices, same mka_priority)"""
try:
run_macsec_psk_br(dev, apdev, 2, [None, None])
finally:
cleanup_macsec_br(count=2)
def test_macsec_psk_br3(dev, apdev):
"""MACsec PSK (bridge; 3 devices)"""
try:
run_macsec_psk_br(dev, apdev, 3, [10, 20, 30])
finally:
cleanup_macsec_br(count=3)
def test_macsec_psk_br3_same_prio(dev, apdev):
"""MACsec PSK (bridge; 3 devices, same mka_priority)"""
try:
run_macsec_psk_br(dev, apdev, 3, [None, None, None])
finally:
cleanup_macsec_br(count=3)
def run_macsec_psk_br(dev, apdev, count, mka_priority):
subprocess.check_call(["brctl", "addbr", "brveth"])
subprocess.call(["echo 8 > /sys/devices/virtual/net/brveth/bridge/group_fwd_mask"],
shell=True)
try:
for i in range(count):
subprocess.check_call([ "ip", "link", "add", "veth%d" % i,
"type", "veth",
"peer", "name", "vethbr%d" % i ])
subprocess.check_call(["ip", "link", "set", "vethbr%d" % i, "up"])
subprocess.check_call([ "brctl", "addif", "brveth",
"vethbr%d" % i ])
except subprocess.CalledProcessError:
raise HwsimSkip("veth not supported (kernel CONFIG_VETH)")
subprocess.check_call(["ip", "link", "set", "brveth", "up"])
log_ip_link()
wpa = add_wpas_interfaces(count=count)
for i in range(count):
set_mka_psk_config(wpa[i], mka_priority=mka_priority[i])
wpa[i].dump_monitor()
wait_mka_done(wpa)
macsec_ifname = []
for i in range(count):
macsec_ifname.append(wpa[i].get_driver_status_field("parent_ifname"))
timeout = 2
max_tries = 2 if count > 2 else 1
success_seen = False
failure_seen = False
for i in range(1, count):
try:
hwsim_utils.test_connectivity(wpa[0], wpa[i],
ifname1=macsec_ifname[0],
ifname2=macsec_ifname[i],
send_len=1400,
timeout=timeout, max_tries=max_tries)
success_seen = True
logger.info("Traffic test %d<->%d success" % (0, i))
except:
failure_seen = True
logger.info("Traffic test %d<->%d failure" % (0, i))
for i in range(2, count):
try:
hwsim_utils.test_connectivity(wpa[1], wpa[i],
ifname1=macsec_ifname[1],
ifname2=macsec_ifname[i],
send_len=1400,
timeout=timeout, max_tries=max_tries)
success_seen = True
logger.info("Traffic test %d<->%d success" % (1, i))
except:
failure_seen = True
logger.info("Traffic test %d<->%d failure" % (1, i))
if not success_seen:
raise Exception("None of the data traffic tests succeeded")
# Something seems to be failing with three device tests semi-regularly, so
# do not report this as a failed test case until the real reason behind
# those failures have been determined.
if failure_seen:
if count < 3:
raise Exception("Data traffic test failed")
else:
logger.info("Data traffic test failed - ignore for now for >= 3 device cases")
for i in range(count):
wpa[i].dump_monitor()
for i in range(count):
del wpa[0]
def test_macsec_psk_ns(dev, apdev, params):
"""MACsec PSK (netns)"""
try:
run_macsec_psk_ns(dev, apdev, params)
finally:
prefix = "macsec_psk_ns"
pidfile = os.path.join(params['logdir'], prefix + ".pid")
for i in range(2):
was_running = False
if os.path.exists(pidfile + str(i)):
with open(pidfile + str(i), 'r') as f:
pid = int(f.read().strip())
logger.info("wpa_supplicant for wpas%d still running with pid %d - kill it" % (i, pid))
was_running = True
os.kill(pid, signal.SIGTERM)
if was_running:
time.sleep(1)
subprocess.call(["ip", "netns", "exec", "ns0",
"ip", "link", "del", "veth0"],
stderr=open('/dev/null', 'w'))
subprocess.call(["ip", "link", "del", "veth0"],
stderr=open('/dev/null', 'w'))
log_ip_link_ns()
subprocess.call(["ip", "netns", "delete", "ns0"],
stderr=open('/dev/null', 'w'))
subprocess.call(["ip", "netns", "delete", "ns1"],
stderr=open('/dev/null', 'w'))
def log_ip_macsec_ns():
cmd = subprocess.Popen([ "ip", "macsec", "show" ],
stdout=subprocess.PIPE,
stderr=open('/dev/null', 'w'))
res = cmd.stdout.read().decode()
cmd.stdout.close()
logger.info("ip macsec show:\n" + res)
cmd = subprocess.Popen([ "ip", "netns", "exec", "ns0",
"ip", "macsec", "show" ],
stdout=subprocess.PIPE,
stderr=open('/dev/null', 'w'))
res = cmd.stdout.read().decode()
cmd.stdout.close()
logger.info("ip macsec show (ns0):\n" + res)
cmd = subprocess.Popen([ "ip", "netns", "exec", "ns1",
"ip", "macsec", "show" ],
stdout=subprocess.PIPE,
stderr=open('/dev/null', 'w'))
res = cmd.stdout.read().decode()
cmd.stdout.close()
logger.info("ip macsec show (ns1):\n" + res)
def log_ip_link_ns():
cmd = subprocess.Popen([ "ip", "link", "show" ],
stdout=subprocess.PIPE)
res = cmd.stdout.read().decode()
cmd.stdout.close()
logger.info("ip link:\n" + res)
cmd = subprocess.Popen([ "ip", "netns", "exec", "ns0",
"ip", "link", "show" ],
stdout=subprocess.PIPE,
stderr=open('/dev/null', 'w'))
res = cmd.stdout.read().decode()
cmd.stdout.close()
logger.info("ip link show (ns0):\n" + res)
cmd = subprocess.Popen([ "ip", "netns", "exec", "ns1",
"ip", "link", "show" ],
stdout=subprocess.PIPE,
stderr=open('/dev/null', 'w'))
res = cmd.stdout.read().decode()
cmd.stdout.close()
logger.info("ip link show (ns1):\n" + res)
def write_conf(conffile, mka_priority=None):
with open(conffile, 'w') as f:
f.write("ctrl_interface=DIR=/var/run/wpa_supplicant\n")
f.write("eapol_version=3\n")
f.write("ap_scan=0\n")
f.write("fast_reauth=1\n")
f.write("network={\n")
f.write(" key_mgmt=NONE\n")
f.write(" mka_cak=000102030405060708090a0b0c0d0e0f\n")
f.write(" mka_ckn=000102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f\n")
if mka_priority is not None:
f.write(" mka_priority=%d\n" % mka_priority)
f.write(" eapol_flags=0\n")
f.write(" macsec_policy=1\n")
f.write("}\n")
def run_macsec_psk_ns(dev, apdev, params):
try:
subprocess.check_call([ "ip", "link", "add", "veth0", "type", "veth",
"peer", "name", "veth1" ])
except subprocess.CalledProcessError:
raise HwsimSkip("veth not supported (kernel CONFIG_VETH)")
prefix = "macsec_psk_ns"
conffile = os.path.join(params['logdir'], prefix + ".conf")
pidfile = os.path.join(params['logdir'], prefix + ".pid")
logfile0 = os.path.join(params['logdir'], prefix + ".veth0.log")
logfile1 = os.path.join(params['logdir'], prefix + ".veth1.log")
cap_veth0 = os.path.join(params['logdir'], prefix + ".veth0.pcap")
cap_veth1 = os.path.join(params['logdir'], prefix + ".veth1.pcap")
cap_macsec0 = os.path.join(params['logdir'], prefix + ".macsec0.pcap")
cap_macsec1 = os.path.join(params['logdir'], prefix + ".macsec1.pcap")
for i in range(2):
try:
subprocess.check_call([ "ip", "netns", "add", "ns%d" % i ])
except subprocess.CalledProcessError:
raise HwsimSkip("network namespace not supported (kernel CONFIG_NAMESPACES, CONFIG_NET_NS)")
subprocess.check_call([ "ip", "link", "set", "veth%d" % i,
"netns", "ns%d" %i ])
subprocess.check_call([ "ip", "netns", "exec", "ns%d" % i,
"ip", "link", "set", "dev", "veth%d" % i,
"up" ])
cmd = {}
cmd[0] = subprocess.Popen(['ip', 'netns', 'exec', 'ns0',
'tcpdump', '-p', '-U', '-i', 'veth0',
'-w', cap_veth0, '-s', '2000',
'--immediate-mode'],
stderr=open('/dev/null', 'w'))
cmd[1] = subprocess.Popen(['ip', 'netns', 'exec', 'ns1',
'tcpdump', '-p', '-U', '-i', 'veth1',
'-w', cap_veth1, '-s', '2000',
'--immediate-mode'],
stderr=open('/dev/null', 'w'))
write_conf(conffile + '0')
write_conf(conffile + '1', mka_priority=100)
prg = os.path.join(params['logdir'],
'alt-wpa_supplicant/wpa_supplicant/wpa_supplicant')
if not os.path.exists(prg):
prg = '../../wpa_supplicant/wpa_supplicant'
arg = [ "ip", "netns", "exec", "ns0",
prg, '-BdddtKW', '-P', pidfile + '0', '-f', logfile0,
'-g', '/tmp/wpas-veth0',
'-Dmacsec_linux', '-c', conffile + '0', '-i', "veth0" ]
logger.info("Start wpa_supplicant: " + str(arg))
try:
subprocess.check_call(arg)
except subprocess.CalledProcessError:
raise HwsimSkip("macsec supported (wpa_supplicant CONFIG_MACSEC, CONFIG_DRIVER_MACSEC_LINUX; kernel CONFIG_MACSEC)")
if os.path.exists("wpa_supplicant-macsec2"):
logger.info("Use alternative wpa_supplicant binary for one of the macsec devices")
prg = "wpa_supplicant-macsec2"
arg = [ "ip", "netns", "exec", "ns1",
prg, '-BdddtKW', '-P', pidfile + '1', '-f', logfile1,
'-g', '/tmp/wpas-veth1',
'-Dmacsec_linux', '-c', conffile + '1', '-i', "veth1" ]
logger.info("Start wpa_supplicant: " + str(arg))
subprocess.check_call(arg)
wpas0 = WpaSupplicant('veth0', '/tmp/wpas-veth0')
wpas1 = WpaSupplicant('veth1', '/tmp/wpas-veth1')
log_ip_macsec_ns()
log_ip_link_ns()
logger.info("wpas0 STATUS:\n" + wpas0.request("STATUS"))
logger.info("wpas1 STATUS:\n" + wpas1.request("STATUS"))
logger.info("wpas0 STATUS-DRIVER:\n" + wpas0.request("STATUS-DRIVER"))
logger.info("wpas1 STATUS-DRIVER:\n" + wpas1.request("STATUS-DRIVER"))
macsec_ifname0 = wpas0.get_driver_status_field("parent_ifname")
macsec_ifname1 = wpas1.get_driver_status_field("parent_ifname")
for i in range(10):
key_tx0 = int(wpas0.get_status_field("Number of Keys Distributed"))
key_rx0 = int(wpas0.get_status_field("Number of Keys Received"))
key_tx1 = int(wpas1.get_status_field("Number of Keys Distributed"))
key_rx1 = int(wpas1.get_status_field("Number of Keys Received"))
if key_rx0 > 0 and key_tx1 > 0:
break
time.sleep(1)
cmd[2] = subprocess.Popen(['ip', 'netns', 'exec', 'ns0',
'tcpdump', '-p', '-U', '-i', macsec_ifname0,
'-w', cap_macsec0, '-s', '2000',
'--immediate-mode'],
stderr=open('/dev/null', 'w'))
cmd[3] = subprocess.Popen(['ip', 'netns', 'exec', 'ns0',
'tcpdump', '-p', '-U', '-i', macsec_ifname1,
'-w', cap_macsec1, '-s', '2000',
'--immediate-mode'],
stderr=open('/dev/null', 'w'))
time.sleep(0.5)
logger.info("wpas0 STATUS:\n" + wpas0.request("STATUS"))
logger.info("wpas1 STATUS:\n" + wpas1.request("STATUS"))
log_ip_macsec_ns()
hwsim_utils.test_connectivity(wpas0, wpas1,
ifname1=macsec_ifname0,
ifname2=macsec_ifname1,
send_len=1400)
log_ip_macsec_ns()
subprocess.check_call(['ip', 'netns', 'exec', 'ns0',
'ip', 'addr', 'add', '192.168.248.17/30',
'dev', macsec_ifname0])
subprocess.check_call(['ip', 'netns', 'exec', 'ns1',
'ip', 'addr', 'add', '192.168.248.18/30',
'dev', macsec_ifname1])
c = subprocess.Popen(['ip', 'netns', 'exec', 'ns0',
'ping', '-c', '2', '192.168.248.18'],
stdout=subprocess.PIPE)
res = c.stdout.read().decode()
c.stdout.close()
logger.info("ping:\n" + res)
if "2 packets transmitted, 2 received" not in res:
raise Exception("ping did not work")
wpas0.request("TERMINATE")
del wpas0
wpas1.request("TERMINATE")
del wpas1
time.sleep(1)
for i in range(len(cmd)):
cmd[i].terminate()
def test_macsec_psk_fail_cp(dev, apdev):
"""MACsec PSK local failures in CP state machine"""
try:
add_veth()
wpa = add_wpas_interfaces()
set_mka_psk_config(wpa[0])
with alloc_fail(wpa[0], 1, "sm_CP_RECEIVE_Enter"):
set_mka_psk_config(wpa[1])
wait_fail_trigger(wpa[0], "GET_ALLOC_FAIL", max_iter=100)
wait_mka_done(wpa)
finally:
cleanup_macsec()
def test_macsec_psk_fail_cp2(dev, apdev):
"""MACsec PSK local failures in CP state machine (2)"""
try:
add_veth()
wpa = add_wpas_interfaces()
set_mka_psk_config(wpa[0])
with alloc_fail(wpa[1], 1, "ieee802_1x_cp_sm_init"):
set_mka_psk_config(wpa[1])
wait_fail_trigger(wpa[1], "GET_ALLOC_FAIL", max_iter=100)
wait_mka_done(wpa)
finally:
cleanup_macsec()