diff --git a/tests/hwsim/hwsim_utils.py b/tests/hwsim/hwsim_utils.py index 1fca4a2d2..7a82df6e8 100644 --- a/tests/hwsim/hwsim_utils.py +++ b/tests/hwsim/hwsim_utils.py @@ -13,7 +13,7 @@ from wpasupplicant import WpaSupplicant def run_connectivity_test(dev1, dev2, tos, dev1group=False, dev2group=False, ifname1=None, ifname2=None, config=True, timeout=5, - multicast_to_unicast=False): + multicast_to_unicast=False, broadcast=True): addr1 = dev1.own_addr() if not dev1group and isinstance(dev1, WpaSupplicant): addr1 = dev1.get_driver_status_field('addr') @@ -66,26 +66,27 @@ def run_connectivity_test(dev1, dev2, tos, dev1group=False, dev2group=False, if "DATA-TEST-RX {} {}".format(addr2, addr1) not in ev: raise Exception("Unexpected dev1->dev2 unicast data result") - cmd = "DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {}".format(addr1, tos) - for i in xrange(broadcast_retry_c): - try: - if dev1group: - dev1.group_request(cmd) - else: - dev1.request(cmd) - if dev2group: - ev = dev2.wait_group_event(["DATA-TEST-RX"], - timeout=timeout) - else: - ev = dev2.wait_event(["DATA-TEST-RX"], timeout=timeout) - if ev is None: - raise Exception("dev1->dev2 broadcast data delivery failed") - if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr1) not in ev: - raise Exception("Unexpected dev1->dev2 broadcast data result") - break - except Exception as e: - if i == broadcast_retry_c - 1: - raise + if broadcast: + cmd = "DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {}".format(addr1, tos) + for i in xrange(broadcast_retry_c): + try: + if dev1group: + dev1.group_request(cmd) + else: + dev1.request(cmd) + if dev2group: + ev = dev2.wait_group_event(["DATA-TEST-RX"], + timeout=timeout) + else: + ev = dev2.wait_event(["DATA-TEST-RX"], timeout=timeout) + if ev is None: + raise Exception("dev1->dev2 broadcast data delivery failed") + if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr1) not in ev: + raise Exception("Unexpected dev1->dev2 broadcast data result") + break + except Exception as e: + if i == broadcast_retry_c - 1: + raise cmd = "DATA_TEST_TX {} {} {}".format(addr1, addr2, tos) if dev2group: @@ -101,32 +102,33 @@ def run_connectivity_test(dev1, dev2, tos, dev1group=False, dev2group=False, if "DATA-TEST-RX {} {}".format(addr1, addr2) not in ev: raise Exception("Unexpected dev2->dev1 unicast data result") - cmd = "DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {}".format(addr2, tos) - for i in xrange(broadcast_retry_c): - try: - if dev2group: - dev2.group_request(cmd) - else: - dev2.request(cmd) - if dev1group: - ev = dev1.wait_group_event(["DATA-TEST-RX"], - timeout=timeout) - else: - ev = dev1.wait_event(["DATA-TEST-RX"], timeout=timeout) - if ev is None: - raise Exception("dev2->dev1 broadcast data delivery failed") - if multicast_to_unicast: - if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr2) in ev: - raise Exception("Unexpected dev2->dev1 broadcast data result: multicast to unicast conversion missing") - if "DATA-TEST-RX {} {}".format(addr1, addr2) not in ev: - raise Exception("Unexpected dev2->dev1 broadcast data result (multicast to unicast enabled)") - else: - if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr2) not in ev: - raise Exception("Unexpected dev2->dev1 broadcast data result") - break - except Exception as e: - if i == broadcast_retry_c - 1: - raise + if broadcast: + cmd = "DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {}".format(addr2, tos) + for i in xrange(broadcast_retry_c): + try: + if dev2group: + dev2.group_request(cmd) + else: + dev2.request(cmd) + if dev1group: + ev = dev1.wait_group_event(["DATA-TEST-RX"], + timeout=timeout) + else: + ev = dev1.wait_event(["DATA-TEST-RX"], timeout=timeout) + if ev is None: + raise Exception("dev2->dev1 broadcast data delivery failed") + if multicast_to_unicast: + if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr2) in ev: + raise Exception("Unexpected dev2->dev1 broadcast data result: multicast to unicast conversion missing") + if "DATA-TEST-RX {} {}".format(addr1, addr2) not in ev: + raise Exception("Unexpected dev2->dev1 broadcast data result (multicast to unicast enabled)") + else: + if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr2) not in ev: + raise Exception("Unexpected dev2->dev1 broadcast data result") + break + except Exception as e: + if i == broadcast_retry_c - 1: + raise finally: if config: if dev1group: @@ -141,7 +143,8 @@ def run_connectivity_test(dev1, dev2, tos, dev1group=False, dev2group=False, def test_connectivity(dev1, dev2, dscp=None, tos=None, max_tries=1, dev1group=False, dev2group=False, ifname1=None, ifname2=None, config=True, timeout=5, - multicast_to_unicast=False, success_expected=True): + multicast_to_unicast=False, success_expected=True, + broadcast=True): if dscp: tos = dscp << 2 if not tos: @@ -154,7 +157,8 @@ def test_connectivity(dev1, dev2, dscp=None, tos=None, max_tries=1, run_connectivity_test(dev1, dev2, tos, dev1group, dev2group, ifname1, ifname2, config=config, timeout=timeout, - multicast_to_unicast=multicast_to_unicast) + multicast_to_unicast=multicast_to_unicast, + broadcast=broadcast) success = True break except Exception, e: diff --git a/tests/hwsim/test_ap_ciphers.py b/tests/hwsim/test_ap_ciphers.py index 978b54dd6..a56ca7df3 100644 --- a/tests/hwsim/test_ap_ciphers.py +++ b/tests/hwsim/test_ap_ciphers.py @@ -558,3 +558,32 @@ def run_ap_wpa2_delayed_group_m1_retransmission(dev, apdev): a = int(after[i], 16) if a < b: raise Exception("RX counter decreased: idx=%d before=%d after=%d" % (i, b, a)) + +def test_ap_wpa2_delayed_m1_m3_zero_tk(dev, apdev): + """Delayed M1+M3 retransmission and zero TK""" + params = hostapd.wpa2_params(ssid="test-wpa2-psk", passphrase="12345678") + hapd = hostapd.add_ap(apdev[0], params) + + Wlantest.setup(hapd) + wt = Wlantest() + wt.flush() + wt.add_passphrase("12345678") + + dev[0].connect("test-wpa2-psk", psk="12345678", scan_freq="2412") + + hwsim_utils.test_connectivity(dev[0], hapd) + addr = dev[0].own_addr() + if "OK" not in hapd.request("RESEND_M1 " + addr + " change-anonce"): + raise Exception("RESEND_M1 failed") + if "OK" not in hapd.request("RESEND_M1 " + addr): + raise Exception("RESEND_M1 failed") + if "OK" not in hapd.request("RESEND_M3 " + addr): + raise Exception("RESEND_M3 failed") + + if "OK" not in hapd.request("SET_KEY 3 %s %d %d %s %s" % (addr, 0, 1, 6*"00", 16*"00")): + raise Exception("SET_KEY failed") + time.sleep(0.1) + hwsim_utils.test_connectivity(dev[0], hapd, timeout=1, broadcast=False, + success_expected=False) + dev[0].request("DISCONNECT") + dev[0].wait_disconnected()