From 517f76b158deb4a6c5376d49778ecd020c9bcd74 Mon Sep 17 00:00:00 2001 From: Jouni Malinen Date: Sun, 17 Mar 2019 18:37:56 +0200 Subject: [PATCH] tests: DPP use the wait_auth_success() helper function Use the already existing helper function and extend it to cover the most common test sequences. Signed-off-by: Jouni Malinen --- tests/hwsim/test_dpp.py | 499 +++++++--------------------------------- 1 file changed, 89 insertions(+), 410 deletions(-) diff --git a/tests/hwsim/test_dpp.py b/tests/hwsim/test_dpp.py index e845fa4a0..19272af76 100644 --- a/tests/hwsim/test_dpp.py +++ b/tests/hwsim/test_dpp.py @@ -206,22 +206,9 @@ def test_dpp_qr_code_curve_select(dev, apdev): res = dev[1].dpp_qr_code(uri) if "OK" not in dev[1].request("DPP_AUTH_INIT peer=%d" % res): raise Exception("Failed to initiate DPP Authentication") - ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Responder)") - ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Initiator)") - ev = dev[0].wait_event(["DPP-CONF-FAILED"], timeout=2) - if ev is None: - raise Exception("DPP configuration result not seen (Enrollee)") - ev = dev[1].wait_event(["DPP-CONF-SENT"], timeout=2) - if ev is None: - raise Exception("DPP configuration result not seen (Responder)") - dev[0].request("DPP_STOP_LISTEN") - dev[1].request("DPP_STOP_LISTEN") - dev[0].dump_monitor() - dev[1].dump_monitor() + wait_auth_success(dev[0], dev[1], configurator=dev[1], enrollee=dev[0], + allow_enrollee_failure=True, stop_responder=True, + stop_initiator=True) def test_dpp_qr_code_auth_broadcast(dev, apdev): """DPP QR Code and authentication exchange (broadcast)""" @@ -238,13 +225,7 @@ def test_dpp_qr_code_auth_broadcast(dev, apdev): dev[0].dpp_listen(2412) if "OK" not in dev[1].request("DPP_AUTH_INIT peer=%d" % id1): raise Exception("Failed to initiate DPP Authentication") - ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Responder)") - ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Initiator)") - dev[0].request("DPP_STOP_LISTEN") + wait_auth_success(dev[0], dev[1], stop_responder=True) def test_dpp_qr_code_auth_unicast(dev, apdev): """DPP QR Code and authentication exchange (unicast)""" @@ -314,28 +295,11 @@ def run_dpp_qr_code_auth_unicast(dev, apdev, curve, netrole=None, key=None, cmd += " configurator=%d" % conf_id if "OK" not in dev[1].request(cmd): raise Exception("Failed to initiate DPP Authentication") - ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Responder)") - ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Initiator)") - ev = dev[1].wait_event(["DPP-CONF-SENT", "DPP-CONF-FAILED"], timeout=5) - if ev is None: - raise Exception("DPP configuration not completed (Configurator)") - if "DPP-CONF-FAILED" in ev and not require_conf_failure: - raise Exception("Unexpected failure on Configurator") - ev = dev[0].wait_event(["DPP-CONF-RECEIVED", "DPP-CONF-FAILED"], timeout=5) - if ev is None: - raise Exception("DPP configuration not completed (Enrollee)") - if require_conf_success: - if "DPP-CONF-FAILED" in ev: - raise Exception("DPP configuration failed") - if require_conf_failure: - if "DPP-CONF-SUCCESS" in ev: - raise Exception("DPP configuration succeeded unexpectedly") - dev[0].request("DPP_STOP_LISTEN") - dev[0].dump_monitor() + wait_auth_success(dev[0], dev[1], configurator=dev[1], enrollee=dev[0], + allow_enrollee_failure=True, + allow_configurator_failure=not require_conf_success, + require_configurator_failure=require_conf_failure, + stop_responder=True) dev[1].dump_monitor() def test_dpp_qr_code_auth_mutual(dev, apdev): @@ -367,13 +331,7 @@ def test_dpp_qr_code_auth_mutual(dev, apdev): if "mutual=1" not in ev: raise Exception("Mutual authentication not used") - ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Responder)") - ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Initiator)") - dev[0].request("DPP_STOP_LISTEN") + wait_auth_success(dev[0], dev[1], stop_responder=True) def test_dpp_qr_code_auth_mutual2(dev, apdev): """DPP QR Code and authentication exchange (mutual2)""" @@ -411,13 +369,7 @@ def test_dpp_qr_code_auth_mutual2(dev, apdev): if "mutual=1" not in ev: raise Exception("Mutual authentication not used") - ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Responder)") - ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Initiator)") - dev[0].request("DPP_STOP_LISTEN") + wait_auth_success(dev[0], dev[1], stop_responder=True) def test_dpp_qr_code_auth_mutual_p_256(dev, apdev): """DPP QR Code and authentication exchange (mutual, autogen P-256)""" @@ -476,13 +428,7 @@ def run_dpp_qr_code_auth_mutual(dev, apdev, curve): if "mutual=1" not in ev: raise Exception("Mutual authentication not used") - ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Responder)") - ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Initiator)") - dev[0].request("DPP_STOP_LISTEN") + wait_auth_success(dev[0], dev[1], stop_responder=True) def test_dpp_auth_resp_retries(dev, apdev): """DPP Authentication Response retries""" @@ -565,13 +511,7 @@ def test_dpp_qr_code_auth_mutual_not_used(dev, apdev): if "mutual=0" not in ev: raise Exception("Mutual authentication not used") - ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Responder)") - ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Initiator)") - dev[0].request("DPP_STOP_LISTEN") + wait_auth_success(dev[0], dev[1], stop_responder=True) def test_dpp_qr_code_auth_mutual_curve_mismatch(dev, apdev): """DPP QR Code and authentication exchange (mutual/mismatch)""" @@ -627,13 +567,7 @@ def test_dpp_qr_code_auth_hostapd_mutual2(dev, apdev): logger.info("AP scans QR Code") hapd.dpp_qr_code(uri0) - ev = hapd.wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Responder)") - ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Initiator)") - hapd.request("DPP_STOP_LISTEN") + wait_auth_success(hapd, dev[0], stop_responder=True) def test_dpp_qr_code_listen_continue(dev, apdev): """DPP QR Code and listen operation needing continuation""" @@ -652,13 +586,7 @@ def test_dpp_qr_code_listen_continue(dev, apdev): logger.info("dev1 initiates DPP Authentication") if "OK" not in dev[1].request("DPP_AUTH_INIT peer=%d" % id1): raise Exception("Failed to initiate DPP Authentication") - ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Responder)") - ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Initiator)") - dev[0].request("DPP_STOP_LISTEN") + wait_auth_success(dev[0], dev[1], stop_responder=True) def test_dpp_qr_code_auth_initiator_enrollee(dev, apdev): """DPP QR Code and authentication exchange (Initiator in Enrollee role)""" @@ -684,21 +612,8 @@ def run_dpp_qr_code_auth_initiator_enrollee(dev, apdev): dev[0].dpp_listen(2412) if "OK" not in dev[1].request("DPP_AUTH_INIT peer=%d role=enrollee" % id1): raise Exception("Failed to initiate DPP Authentication") - ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Responder)") - ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Initiator)") - - ev = dev[0].wait_event(["DPP-CONF-SENT"], timeout=5) - if ev is None: - raise Exception("DPP configuration did not succeed (Configurator)") - ev = dev[1].wait_event(["DPP-CONF-FAILED"], timeout=5) - if ev is None: - raise Exception("DPP configuration did not succeed (Enrollee)") - - dev[0].request("DPP_STOP_LISTEN") + wait_auth_success(dev[0], dev[1], configurator=dev[0], enrollee=dev[1], + allow_enrollee_failure=True, stop_responder=True) def test_dpp_qr_code_auth_initiator_either_1(dev, apdev): """DPP QR Code and authentication exchange (Initiator in either role)""" @@ -729,21 +644,9 @@ def run_dpp_qr_code_auth_initiator_either(dev, apdev, resp_role, dev[0].dpp_listen(2412, role=resp_role) if "OK" not in dev[1].request("DPP_AUTH_INIT peer=%d role=either" % id1): raise Exception("Failed to initiate DPP Authentication") - ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Responder)") - ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Initiator)") - - ev = conf_dev.wait_event(["DPP-CONF-SENT"], timeout=5) - if ev is None: - raise Exception("DPP configuration did not succeed (Configurator)") - ev = enrollee_dev.wait_event(["DPP-CONF-FAILED"], timeout=5) - if ev is None: - raise Exception("DPP configuration did not succeed (Enrollee)") - - dev[0].request("DPP_STOP_LISTEN") + wait_auth_success(dev[0], dev[1], configurator=conf_dev, + enrollee=enrollee_dev, allow_enrollee_failure=True, + stop_responder=True) def run_init_incompatible_roles(dev, role="enrollee"): check_dpp_capab(dev[0]) @@ -773,13 +676,7 @@ def test_dpp_qr_code_auth_incompatible_roles(dev, apdev): if "OK" not in dev[1].request("DPP_AUTH_INIT peer=%d role=configurator" % id1): raise Exception("Failed to initiate DPP Authentication") - ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Responder)") - ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Initiator)") - dev[0].request("DPP_STOP_LISTEN") + wait_auth_success(dev[0], dev[1], stop_responder=True) def test_dpp_qr_code_auth_incompatible_roles2(dev, apdev): """DPP QR Code and authentication exchange (incompatible roles 2)""" @@ -900,22 +797,8 @@ def test_dpp_qr_code_auth_neg_chan(dev, apdev): if "freq=2462 result=SUCCESS" not in ev: raise Exception("Unexpected TX status for Authentication Confirm: " + ev) - ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Responder)") - ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Initiator)") - ev = dev[1].wait_event(["DPP-CONF-SENT"], timeout=5) - if ev is None: - raise Exception("DPP configuration not completed (Configurator)") - ev = dev[0].wait_event(["DPP-CONF-RECEIVED", "DPP-CONF-FAILED"], timeout=5) - if ev is None: - raise Exception("DPP configuration not completed (Enrollee)") - if "DPP-CONF-FAILED" in ev: - raise Exception("DPP configuration failed") - dev[0].request("DPP_STOP_LISTEN") - dev[0].dump_monitor() + wait_auth_success(dev[0], dev[1], configurator=dev[1], enrollee=dev[0], + stop_responder=True) dev[1].dump_monitor() def test_dpp_config_legacy(dev, apdev): @@ -1532,12 +1415,7 @@ def test_dpp_gas_timeout(dev, apdev): msg['freq'], msg['datarate'], msg['ssi_signal'], binascii.hexlify(msg['frame']).decode())): raise Exception("MGMT_RX_PROCESS failed") - ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Responder)") - ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Initiator)") + wait_auth_success(dev[0], dev[1]) # DPP Configuration Response (GAS Initial Response frame) msg = dev[0].mgmt_rx() @@ -1803,21 +1681,7 @@ def run_dpp_ap_config(dev, apdev, curve=None, conf_curve=None, cmd = "DPP_AUTH_INIT peer=%d conf=ap-dpp configurator=%d" % (id, conf_id) if "OK" not in dev[0].request(cmd): raise Exception("Failed to initiate DPP Authentication") - ev = hapd.wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Responder)") - ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Initiator)") - ev = dev[0].wait_event(["DPP-CONF-SENT"], timeout=5) - if ev is None: - raise Exception("DPP configuration not completed (Configurator)") - ev = hapd.wait_event(["DPP-CONF-RECEIVED", "DPP-CONF-FAILED"], timeout=5) - if ev is None: - raise Exception("DPP configuration not completed (Enrollee)") - if "DPP-CONF-FAILED" in ev: - raise Exception("DPP configuration failed") - + wait_auth_success(hapd, dev[0], configurator=dev[0], enrollee=hapd) update_hapd_config(hapd) id1 = dev[1].dpp_bootstrap_gen(chan="81/1", mac=True, curve=curve) @@ -1842,19 +1706,8 @@ def run_dpp_ap_config(dev, apdev, curve=None, conf_curve=None, cmd = "DPP_AUTH_INIT peer=%d conf=sta-dpp configurator=%d" % (id0b, conf_id) if "OK" not in dev[0].request(cmd): raise Exception("Failed to initiate DPP Authentication") - ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Responder)") - ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Initiator)") - ev = dev[0].wait_event(["DPP-CONF-SENT"], timeout=5) - if ev is None: - raise Exception("DPP configuration not completed (Configurator)") - ev = dev[1].wait_event(["DPP-CONF-RECEIVED"], timeout=5) - if ev is None: - raise Exception("DPP configuration not completed (Enrollee)") - dev[1].request("DPP_STOP_LISTEN") + wait_auth_success(dev[1], dev[0], configurator=dev[0], enrollee=dev[1], + stop_responder=True) ev = dev[1].wait_event(["DPP-CONFOBJ-SSID"], timeout=1) if ev is None: @@ -2129,20 +1982,8 @@ def run_dpp_qr_code_auth_responder_configurator(dev, apdev, extra): cmd = "DPP_AUTH_INIT peer=%d role=enrollee" % id1 if "OK" not in dev[1].request(cmd): raise Exception("Failed to initiate DPP Authentication") - ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Responder)") - ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Initiator)") - ev = dev[0].wait_event(["DPP-CONF-SENT"], timeout=5) - if ev is None: - raise Exception("DPP configuration not completed (Configurator)") - ev = dev[1].wait_event(["DPP-CONF-RECEIVED"], timeout=5) - if ev is None: - raise Exception("DPP configuration not completed (Enrollee)") - dev[0].request("DPP_STOP_LISTEN") - dev[0].dump_monitor() + wait_auth_success(dev[0], dev[1], configurator=dev[0], enrollee=dev[1], + stop_responder=True) dev[1].dump_monitor() def test_dpp_qr_code_hostapd_init(dev, apdev): @@ -2169,20 +2010,8 @@ def test_dpp_qr_code_hostapd_init(dev, apdev): cmd = "DPP_AUTH_INIT peer=%d role=enrollee" % id1 if "OK" not in hapd.request(cmd): raise Exception("Failed to initiate DPP Authentication") - ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Responder)") - ev = hapd.wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Initiator)") - ev = dev[0].wait_event(["DPP-CONF-SENT"], timeout=5) - if ev is None: - raise Exception("DPP configuration not completed (Configurator)") - ev = hapd.wait_event(["DPP-CONF-RECEIVED"], timeout=5) - if ev is None: - raise Exception("DPP configuration not completed (Enrollee)") - dev[0].request("DPP_STOP_LISTEN") - dev[0].dump_monitor() + wait_auth_success(dev[0], hapd, configurator=dev[0], enrollee=hapd, + stop_responder=True) def test_dpp_qr_code_hostapd_init_offchannel(dev, apdev): """DPP QR Code and hostapd as initiator (offchannel)""" @@ -2217,20 +2046,8 @@ def run_dpp_qr_code_hostapd_init_offchannel(dev, apdev, extra): cmd += " " + extra if "OK" not in hapd.request(cmd): raise Exception("Failed to initiate DPP Authentication") - ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Responder)") - ev = hapd.wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Initiator)") - ev = dev[0].wait_event(["DPP-CONF-SENT"], timeout=5) - if ev is None: - raise Exception("DPP configuration not completed (Configurator)") - ev = hapd.wait_event(["DPP-CONF-RECEIVED"], timeout=5) - if ev is None: - raise Exception("DPP configuration not completed (Enrollee)") - dev[0].request("DPP_STOP_LISTEN") - dev[0].dump_monitor() + wait_auth_success(dev[0], hapd, configurator=dev[0], enrollee=hapd, + stop_responder=True) def test_dpp_test_vector_p_256(dev, apdev): """DPP P-256 test vector (mutual auth)""" @@ -2268,13 +2085,7 @@ def test_dpp_test_vector_p_256(dev, apdev): cmd = "DPP_AUTH_INIT peer=%d own=%d neg_freq=2412" % (id1peer, id1) if "OK" not in dev[1].request(cmd): raise Exception("Failed to initiate operation") - - ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Initiator)") - ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Responder)") + wait_auth_success(dev[0], dev[1]) def test_dpp_test_vector_p_256_b(dev, apdev): """DPP P-256 test vector (Responder-only auth)""" @@ -2311,13 +2122,7 @@ def test_dpp_test_vector_p_256_b(dev, apdev): cmd = "DPP_AUTH_INIT peer=%d own=%d neg_freq=2412" % (id1peer, id1) if "OK" not in dev[1].request(cmd): raise Exception("Failed to initiate operation") - - ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Initiator)") - ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Responder)") + wait_auth_success(dev[0], dev[1]) def der_priv_key_p_521(priv): if len(priv) != 2 * 66: @@ -2363,13 +2168,7 @@ def test_dpp_test_vector_p_521(dev, apdev): cmd = "DPP_AUTH_INIT peer=%d own=%d neg_freq=2412" % (id1peer, id1) if "OK" not in dev[1].request(cmd): raise Exception("Failed to initiate operation") - - ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Initiator)") - ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Responder)") + wait_auth_success(dev[0], dev[1]) def test_dpp_pkex(dev, apdev): """DPP and PKEX""" @@ -2467,20 +2266,9 @@ def run_dpp_pkex(dev, apdev, curve=None, init_extra="", check_config=False, raise Exception("DPP authentication succeeded") return - ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Initiator)") - ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Responder)") - - if check_config: - ev = dev[1].wait_event(["DPP-CONF-SENT"], timeout=5) - if ev is None: - raise Exception("DPP configuration not completed (Configurator)") - ev = dev[0].wait_event(["DPP-CONF-RECEIVED"], timeout=5) - if ev is None: - raise Exception("DPP configuration not completed (Enrollee)") + wait_auth_success(dev[0], dev[1], + configurator=dev[1] if check_config else None, + enrollee=dev[0] if check_config else None) def test_dpp_pkex_5ghz(dev, apdev): """DPP and PKEX on 5 GHz""" @@ -2516,12 +2304,7 @@ def run_dpp_pkex_5ghz(dev, apdev): if "FAIL" in res: raise Exception("Failed to set PKEX data (initiator)") - ev = dev[1].wait_event(["DPP-AUTH-SUCCESS", "DPP-FAIL"], timeout=20) - if ev is None or "DPP-AUTH-SUCCESS" not in ev: - raise Exception("DPP authentication did not succeed (Initiator)") - ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Responder)") + wait_auth_success(dev[0], dev[1], timeout=20) def test_dpp_pkex_test_vector(dev, apdev): """DPP and PKEX (P-256) test vector""" @@ -2580,13 +2363,7 @@ def test_dpp_pkex_test_vector(dev, apdev): res = dev[1].request(cmd) if "FAIL" in res: raise Exception("Failed to set PKEX data (initiator)") - - ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Initiator)") - ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Responder)") + wait_auth_success(dev[0], dev[1]) def test_dpp_pkex_code_mismatch(dev, apdev): """DPP and PKEX with mismatching code""" @@ -2620,13 +2397,7 @@ def test_dpp_pkex_code_mismatch(dev, apdev): res = dev[1].request(cmd) if "FAIL" in res: raise Exception("Failed to set PKEX data (initiator, retry)") - - ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Initiator, retry)") - ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Responder, retry)") + wait_auth_success(dev[0], dev[1]) def test_dpp_pkex_code_mismatch_limit(dev, apdev): """DPP and PKEX with mismatching code limit""" @@ -2799,20 +2570,7 @@ def run_dpp_pkex2(dev, apdev, curve=None, init_extra=""): res = dev[1].request(cmd) if "FAIL" in res: raise Exception("Failed to set PKEX data (initiator)") - - ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Initiator)") - ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Responder)") - - ev = dev[0].wait_event(["DPP-CONF-SENT"], timeout=5) - if ev is None: - raise Exception("DPP configuration not completed (Configurator)") - ev = dev[1].wait_event(["DPP-CONF-RECEIVED"], timeout=5) - if ev is None: - raise Exception("DPP configuration not completed (Enrollee)") + wait_auth_success(dev[0], dev[1], configurator=dev[0], enrollee=dev[1]) def test_dpp_pkex_no_responder(dev, apdev): """DPP and PKEX with no responder (retry behavior)""" @@ -2854,17 +2612,8 @@ def test_dpp_pkex_after_retry(dev, apdev): if "FAIL" in res: raise Exception("Failed to set PKEX data (responder)") dev[1].dpp_listen(2437) - - ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=10) - if ev is None: - raise Exception("DPP authentication did not succeed (Responder)") - ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Initiator)") - ev = dev[0].wait_event(["DPP-CONF-SENT"], timeout=5) - if ev is None: - raise Exception("DPP configuration not completed (Configurator)") - # Ignore Enrollee result since configurator was not set here + wait_auth_success(dev[1], dev[0], configurator=dev[0], enrollee=dev[1], + allow_enrollee_failure=True) def test_dpp_pkex_hostapd_responder(dev, apdev): """DPP PKEX with hostapd as responder""" @@ -2892,21 +2641,8 @@ def test_dpp_pkex_hostapd_responder(dev, apdev): res = dev[0].request(cmd) if "FAIL" in res: raise Exception("Failed to set PKEX data (initiator/wpa_supplicant)") - - ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Initiator)") - ev = hapd.wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Responder)") - ev = dev[0].wait_event(["DPP-CONF-SENT"], timeout=5) - if ev is None: - raise Exception("DPP configuration not completed (Configurator)") - ev = hapd.wait_event(["DPP-CONF-RECEIVED"], timeout=5) - if ev is None: - raise Exception("DPP configuration not completed (Enrollee)") - dev[0].request("DPP_STOP_LISTEN") - dev[0].dump_monitor() + wait_auth_success(hapd, dev[0], configurator=dev[0], enrollee=hapd, + stop_initiator=True) def test_dpp_pkex_hostapd_initiator(dev, apdev): """DPP PKEX with hostapd as initiator""" @@ -2938,21 +2674,8 @@ def test_dpp_pkex_hostapd_initiator(dev, apdev): res = hapd.request(cmd) if "FAIL" in res: raise Exception("Failed to set PKEX data (initiator/hostapd)") - - ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Initiator)") - ev = hapd.wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Responder)") - ev = dev[0].wait_event(["DPP-CONF-SENT"], timeout=5) - if ev is None: - raise Exception("DPP configuration not completed (Configurator)") - ev = hapd.wait_event(["DPP-CONF-RECEIVED"], timeout=5) - if ev is None: - raise Exception("DPP configuration not completed (Enrollee)") - dev[0].request("DPP_STOP_LISTEN") - dev[0].dump_monitor() + wait_auth_success(hapd, dev[0], configurator=dev[0], enrollee=hapd, + stop_initiator=True) def test_dpp_hostapd_configurator(dev, apdev): """DPP with hostapd as configurator/initiator""" @@ -2984,21 +2707,8 @@ def test_dpp_hostapd_configurator(dev, apdev): cmd = "DPP_AUTH_INIT peer=%d configurator=%d conf=sta-dpp" % (id1, conf_id) if "OK" not in hapd.request(cmd): raise Exception("Failed to initiate DPP Authentication") - - ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Responder)") - ev = hapd.wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Initiator)") - ev = hapd.wait_event(["DPP-CONF-SENT"], timeout=5) - if ev is None: - raise Exception("DPP configuration not completed (Configurator)") - ev = dev[0].wait_event(["DPP-CONF-RECEIVED"], timeout=5) - if ev is None: - raise Exception("DPP configuration not completed (Enrollee)") - dev[0].request("DPP_STOP_LISTEN") - dev[0].dump_monitor() + wait_auth_success(dev[0], hapd, configurator=hapd, enrollee=dev[0], + stop_responder=True) def test_dpp_hostapd_configurator_responder(dev, apdev): """DPP with hostapd as configurator/responder""" @@ -3024,21 +2734,8 @@ def test_dpp_hostapd_configurator_responder(dev, apdev): cmd = "DPP_AUTH_INIT peer=%d role=enrollee" % (id1) if "OK" not in dev[0].request(cmd): raise Exception("Failed to initiate DPP Authentication") - - ev = hapd.wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Responder)") - ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Initiator)") - ev = hapd.wait_event(["DPP-CONF-SENT"], timeout=5) - if ev is None: - raise Exception("DPP configuration not completed (Configurator)") - ev = dev[0].wait_event(["DPP-CONF-RECEIVED"], timeout=5) - if ev is None: - raise Exception("DPP configuration not completed (Enrollee)") - dev[0].request("DPP_STOP_LISTEN") - dev[0].dump_monitor() + wait_auth_success(hapd, dev[0], configurator=hapd, enrollee=dev[0], + stop_initiator=True) def test_dpp_own_config(dev, apdev): """DPP configurator signing own connector""" @@ -3081,19 +2778,7 @@ def run_dpp_own_config(dev, apdev, own_curve=None, expect_failure=False, cmd = "DPP_AUTH_INIT peer=%d conf=ap-dpp configurator=%d%s" % (id, conf_id, extra) if "OK" not in dev[0].request(cmd): raise Exception("Failed to initiate DPP Authentication") - ev = hapd.wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Responder)") - ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Initiator)") - ev = dev[0].wait_event(["DPP-CONF-SENT"], timeout=5) - if ev is None: - raise Exception("DPP configuration not completed (Configurator)") - ev = hapd.wait_event(["DPP-CONF-RECEIVED"], timeout=5) - if ev is None: - raise Exception("DPP configuration not completed (Enrollee)") - + wait_auth_success(hapd, dev[0], configurator=dev[0], enrollee=hapd) update_hapd_config(hapd) dev[0].set("dpp_config_processing", "1") @@ -3180,18 +2865,7 @@ def run_dpp_own_config_ap(dev, apdev, reconf_configurator=False, extra=""): cmd = "DPP_AUTH_INIT peer=%d conf=sta-dpp configurator=%d%s" % (id, conf_id, extra) if "OK" not in hapd.request(cmd): raise Exception("Failed to initiate DPP Authentication") - ev = hapd.wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Initiator)") - ev = hapd.wait_event(["DPP-CONF-SENT"], timeout=5) - if ev is None: - raise Exception("DPP configuration not completed (Configurator)") - ev = dev[0].wait_event(["DPP-CONF-RECEIVED", "DPP-CONF-FAILED"], timeout=5) - if ev is None: - raise Exception("DPP configuration not completed (Enrollee)") - if "DPP-CONF-RECEIVED" not in ev: - raise Exception("DPP configuration failed (Enrollee)") - + wait_auth_success(dev[0], hapd, configurator=hapd, enrollee=dev[0]) dev[0].wait_connected() def test_dpp_intro_mismatch(dev, apdev): @@ -3890,12 +3564,7 @@ def test_dpp_proto_stop_at_auth_conf(dev, apdev): def test_dpp_proto_stop_at_auth_conf_tx(dev, apdev): """DPP protocol testing - stop when transmitting Auth Conf (Registrar)""" run_dpp_proto_init(dev, 1, 89, init_enrollee=True) - ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=10) - if ev is None: - raise Exception("Authentication did not succeed (Initiator)") - ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("Authentication did not succeed (Responder)") + wait_auth_success(dev[0], dev[1], timeout=10) ev = dev[1].wait_event(["GAS-QUERY-START"], timeout=0.1) if ev is not None: raise Exception("Unexpected GAS query") @@ -3906,12 +3575,7 @@ def test_dpp_proto_stop_at_auth_conf_tx(dev, apdev): def test_dpp_proto_stop_at_auth_conf_tx2(dev, apdev): """DPP protocol testing - stop when transmitting Auth Conf (Enrollee)""" run_dpp_proto_init(dev, 1, 89) - ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=10) - if ev is None: - raise Exception("Authentication did not succeed (Initiator)") - ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("Authentication did not succeed (Responder)") + wait_auth_success(dev[0], dev[1], timeout=10) ev = dev[0].wait_event(["GAS-QUERY-DONE"], timeout=5) if ev is None or "result=TIMEOUT" not in ev: @@ -4254,18 +3918,9 @@ def run_dpp_qr_code_chan_list(dev, apdev, unicast, listen_freq, chanlist, raise Exception("Failed to initiate DPP Authentication") if no_wait: return - ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=timeout) - if ev is None: - raise Exception("DPP authentication did not succeed (Responder)") - ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: - raise Exception("DPP authentication did not succeed (Initiator)") - ev = dev[0].wait_event(["DPP-CONF-RECEIVED", "DPP-CONF-FAILED"], timeout=5) - if ev is None: - raise Exception("DPP configuration not completed (Enrollee)") - dev[0].request("DPP_STOP_LISTEN") - dev[0].dump_monitor() - dev[1].dump_monitor() + wait_auth_success(dev[0], dev[1], timeout=timeout, configurator=dev[1], + enrollee=dev[0], allow_enrollee_failure=True, + stop_responder=True) def test_dpp_qr_code_chan_list_no_match(dev, apdev): """DPP QR Code and no matching supported channel""" @@ -4540,13 +4195,37 @@ def rx_process_frame(dev): raise Exception("MGMT_RX_PROCESS failed") return msg -def wait_auth_success(responder, initiator): - ev = responder.wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: +def wait_auth_success(responder, initiator, configurator=None, enrollee=None, + allow_enrollee_failure=False, + allow_configurator_failure=False, + require_configurator_failure=False, + timeout=5, stop_responder=False, stop_initiator=False): + ev = responder.wait_event(["DPP-AUTH-SUCCESS", "DPP-FAIL"], timeout=timeout) + if ev is None or "DPP-AUTH-SUCCESS" not in ev: raise Exception("DPP authentication did not succeed (Responder)") - ev = initiator.wait_event(["DPP-AUTH-SUCCESS"], timeout=5) - if ev is None: + ev = initiator.wait_event(["DPP-AUTH-SUCCESS", "DPP-FAIL"], timeout=5) + if ev is None or "DPP-AUTH-SUCCESS" not in ev: raise Exception("DPP authentication did not succeed (Initiator)") + if configurator: + ev = configurator.wait_event(["DPP-CONF-SENT", + "DPP-CONF-FAILED"], timeout=5) + if ev is None: + raise Exception("DPP configuration not completed (Configurator)") + if "DPP-CONF-FAILED" in ev and not allow_configurator_failure: + raise Exception("DPP configuration did not succeed (Configurator") + if "DPP-CONF-SUCCESS" in ev and not require_configurator_failure: + raise Exception("DPP configuration succeeded (Configurator") + if enrollee: + ev = enrollee.wait_event(["DPP-CONF-RECEIVED", + "DPP-CONF-FAILED"], timeout=5) + if ev is None: + raise Exception("DPP configuration not completed (Enrollee)") + if "DPP-CONF-FAILED" in ev and not allow_enrollee_failure: + raise Exception("DPP configuration did not succeed (Enrollee)") + if stop_responder: + responder.request("DPP_STOP_LISTEN") + if stop_initiator: + initiator.request("DPP_STOP_LISTEN") def wait_conf_completion(configurator, enrollee): ev = configurator.wait_event(["DPP-CONF-SENT"], timeout=5)