diff --git a/tests/hwsim/test_pmksa_cache.py b/tests/hwsim/test_pmksa_cache.py index 78b0dba4b..e1d0d3c41 100644 --- a/tests/hwsim/test_pmksa_cache.py +++ b/tests/hwsim/test_pmksa_cache.py @@ -4,15 +4,18 @@ # This software may be distributed under the terms of the BSD license. # See README for more details. +import binascii import logging logger = logging.getLogger() +import socket +import struct import subprocess import time import hostapd import hwsim_utils from wpasupplicant import WpaSupplicant -from utils import alloc_fail, HwsimSkip +from utils import alloc_fail, HwsimSkip, wait_fail_trigger from test_ap_eap import eap_connect def test_pmksa_cache_on_roam_back(dev, apdev): @@ -979,3 +982,97 @@ def test_pmksa_cache_ctrl_ext(dev, apdev): raise Exception("Connection with the AP timed out") if "CTRL-EVENT-EAP-STARTED" in ev: raise Exception("Unexpected EAP exchange after external PMKSA cache restore") + +def test_rsn_preauth_processing(dev, apdev): + """RSN pre-authentication processing on AP""" + params = hostapd.wpa2_eap_params(ssid="test-wpa2-eap") + params['rsn_preauth'] = '1' + params['rsn_preauth_interfaces'] = "lo" + hapd = hostapd.add_ap(apdev[0], params) + bssid = hapd.own_addr() + _bssid = binascii.unhexlify(bssid.replace(':', '')) + eap_connect(dev[0], hapd, "PAX", "pax.user@example.com", + password_hex="0123456789abcdef0123456789abcdef") + addr = dev[0].own_addr() + _addr = binascii.unhexlify(addr.replace(':', '')) + + sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, + socket.htons(0x88c7)) + sock.bind(("lo", socket.htons(0x88c7))) + + foreign = "\x02\x03\x04\x05\x06\x07" + proto = "\x88\xc7" + tests = [] + # RSN: too short pre-auth packet (len=14) + tests += [ _bssid + foreign + proto ] + # Not EAPOL-Start + tests += [ _bssid + foreign + proto + struct.pack('>BBH', 0, 0, 0) ] + # RSN: pre-auth for foreign address 02:03:04:05:06:07 + tests += [ foreign + foreign + proto + struct.pack('>BBH', 0, 0, 0) ] + # RSN: pre-auth for already association STA 02:00:00:00:00:00 + tests += [ _bssid + _addr + proto + struct.pack('>BBH', 0, 0, 0) ] + # New STA + tests += [ _bssid + foreign + proto + struct.pack('>BBH', 0, 1, 1) ] + # IEEE 802.1X: received EAPOL-Start from STA + tests += [ _bssid + foreign + proto + struct.pack('>BBH', 0, 1, 0) ] + # frame too short for this IEEE 802.1X packet + tests += [ _bssid + foreign + proto + struct.pack('>BBH', 0, 1, 1) ] + # EAPOL-Key - Dropped key data from unauthorized Supplicant + tests += [ _bssid + foreign + proto + struct.pack('>BBH', 2, 3, 0) ] + # EAPOL-Encapsulated-ASF-Alert + tests += [ _bssid + foreign + proto + struct.pack('>BBH', 2, 4, 0) ] + # unknown IEEE 802.1X packet type + tests += [ _bssid + foreign + proto + struct.pack('>BBH', 2, 255, 0) ] + for t in tests: + sock.send(t) + +def test_rsn_preauth_local_errors(dev, apdev): + """RSN pre-authentication and local errors on AP""" + params = hostapd.wpa2_eap_params(ssid="test-wpa2-eap") + params['rsn_preauth'] = '1' + params['rsn_preauth_interfaces'] = "lo" + hapd = hostapd.add_ap(apdev[0], params) + bssid = hapd.own_addr() + _bssid = binascii.unhexlify(bssid.replace(':', '')) + + sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, + socket.htons(0x88c7)) + sock.bind(("lo", socket.htons(0x88c7))) + + foreign = "\x02\x03\x04\x05\x06\x07" + foreign2 = "\x02\x03\x04\x05\x06\x08" + proto = "\x88\xc7" + + with alloc_fail(hapd, 1, "ap_sta_add;rsn_preauth_receive"): + sock.send(_bssid + foreign + proto + struct.pack('>BBH', 2, 1, 0)) + wait_fail_trigger(hapd, "GET_ALLOC_FAIL") + + with alloc_fail(hapd, 1, "eapol_auth_alloc;rsn_preauth_receive"): + sock.send(_bssid + foreign + proto + struct.pack('>BBH', 2, 1, 0)) + wait_fail_trigger(hapd, "GET_ALLOC_FAIL") + sock.send(_bssid + foreign + proto + struct.pack('>BBH', 2, 1, 0)) + + with alloc_fail(hapd, 1, "eap_server_sm_init;ieee802_1x_new_station;rsn_preauth_receive"): + sock.send(_bssid + foreign2 + proto + struct.pack('>BBH', 2, 1, 0)) + wait_fail_trigger(hapd, "GET_ALLOC_FAIL") + sock.send(_bssid + foreign2 + proto + struct.pack('>BBH', 2, 1, 0)) + + hapd.request("DISABLE") + tests = [ (1, "=rsn_preauth_iface_add"), + (2, "=rsn_preauth_iface_add"), + (1, "l2_packet_init;rsn_preauth_iface_add"), + (1, "rsn_preauth_iface_init"), + (1, "rsn_preauth_iface_init") ] + for count,func in tests: + with alloc_fail(hapd, count, func): + if "FAIL" not in hapd.request("ENABLE"): + raise Exception("ENABLE succeeded unexpectedly") + + hapd.set("rsn_preauth_interfaces", "lo lo lo does-not-exist lo ") + if "FAIL" not in hapd.request("ENABLE"): + raise Exception("ENABLE succeeded unexpectedly") + hapd.set("rsn_preauth_interfaces", " lo lo ") + if "OK" not in hapd.request("ENABLE"): + raise Exception("ENABLE failed") + sock.send(_bssid + foreign + proto + struct.pack('>BBH', 2, 1, 0)) + sock.send(_bssid + foreign2 + proto + struct.pack('>BBH', 2, 1, 0))