tests: Use more robust way to determine MKA is done for MACsec testing

Signed-off-by: Jouni Malinen <j@w1.fi>
This commit is contained in:
Jouni Malinen 2018-12-29 11:37:01 +02:00
parent 8c652ecfbe
commit 0d09bd0832

View file

@ -171,23 +171,57 @@ def add_wpas_interfaces(count=2):
return wpa return wpa
def wait_key_distribution(wpas0, wpas1, expect_failure=False): def lower_addr(addr1, addr2):
a1 = addr1.split(':')
a2 = addr2.split(':')
for i in range(6):
if a1[i].decode("hex") < a2[i].decode("hex"):
return True
if a1[i].decode("hex") > a2[i].decode("hex"):
return False
return False
def wait_mka_done(wpa, expect_failure=False):
max_iter = 14 if expect_failure else 40 max_iter = 14 if expect_failure else 40
for i in range(max_iter): for i in range(max_iter):
key_tx0 = int(wpas0.get_status_field("Number of Keys Distributed")) done = True
key_rx0 = int(wpas0.get_status_field("Number of Keys Received")) for w in wpa:
key_tx1 = int(wpas1.get_status_field("Number of Keys Distributed")) secured = w.get_status_field("Secured")
key_rx1 = int(wpas1.get_status_field("Number of Keys Received")) peers = int(w.get_status_field("live_peers"))
if (key_tx0 > 0 or key_rx0 > 0) and (key_tx1 > 0 or key_rx1 > 0): if expect_failure and (secured == "Yes" or peers > 0):
return raise Exception("MKA completed unexpectedly")
if peers != len(wpa) - 1 or secured != "Yes":
done = False
break
w.dump_monitor()
if done:
break
time.sleep(0.5) time.sleep(0.5)
if expect_failure: if expect_failure:
if key_tx0 != 0 or key_rx0 != 0 or key_tx1 != 0 or key_rx1 != 0:
raise Exception("Unexpected key distribution")
return return
raise Exception("No key distribution seen") if not done:
raise Exception("MKA not completed successfully")
key_server = None
ks_prio = 999
for w in wpa:
logger.info("%s STATUS:\n%s" % (w.ifname, w.request("STATUS")))
addr = w.get_status_field("address")
prio = int(w.get_status_field("Actor Priority"))
if key_server is None or prio < ks_prio or \
(prio == ks_prio and lower_addr(addr, ks_addr)):
key_server = w
ks_addr = addr
ks_prio = prio
logger.info("Expected key server: " + key_server.ifname)
if key_server.get_status_field("is_key_server") != "Yes":
raise Exception("Expected key server was not elected")
for w in wpa:
if w != key_server and w.get_status_field("is_key_server") == "Yes":
raise Exception("Unexpected key server")
def run_macsec_psk(dev, apdev, params, prefix, integ_only=False, port0=None, def run_macsec_psk(dev, apdev, params, prefix, integ_only=False, port0=None,
port1=None, ckn0=None, ckn1=None, cak0=None, cak1=None, port1=None, ckn0=None, ckn1=None, cak0=None, cak1=None,
@ -232,7 +266,7 @@ def run_macsec_psk(dev, apdev, params, prefix, integ_only=False, port0=None,
macsec_ifname0 = wpas0.get_driver_status_field("parent_ifname") macsec_ifname0 = wpas0.get_driver_status_field("parent_ifname")
macsec_ifname1 = wpas1.get_driver_status_field("parent_ifname") macsec_ifname1 = wpas1.get_driver_status_field("parent_ifname")
wait_key_distribution(wpas0, wpas1, expect_failure=expect_failure) wait_mka_done(wpa, expect_failure=expect_failure)
if expect_failure: if expect_failure:
for i in range(len(cmd)): for i in range(len(cmd)):
@ -500,7 +534,7 @@ def test_macsec_psk_fail_cp(dev, apdev):
set_mka_psk_config(wpa[1]) set_mka_psk_config(wpa[1])
wait_fail_trigger(wpa[0], "GET_ALLOC_FAIL", max_iter=100) wait_fail_trigger(wpa[0], "GET_ALLOC_FAIL", max_iter=100)
wait_key_distribution(wpa[0], wpa[1]) wait_mka_done(wpa)
finally: finally:
cleanup_macsec() cleanup_macsec()
@ -514,6 +548,6 @@ def test_macsec_psk_fail_cp2(dev, apdev):
set_mka_psk_config(wpa[1]) set_mka_psk_config(wpa[1])
wait_fail_trigger(wpa[1], "GET_ALLOC_FAIL", max_iter=100) wait_fail_trigger(wpa[1], "GET_ALLOC_FAIL", max_iter=100)
wait_key_distribution(wpa[0], wpa[1]) wait_mka_done(wpa)
finally: finally:
cleanup_macsec() cleanup_macsec()