tests: Protocol testing for supplicant PMF/IGTK KDE handling

Signed-off-by: Jouni Malinen <jouni@codeaurora.org>
This commit is contained in:
Jouni Malinen 2019-04-16 12:08:35 +03:00 committed by Jouni Malinen
parent 824cb5a530
commit 8030e2b594
3 changed files with 117 additions and 29 deletions

View file

@ -13,12 +13,13 @@ import logging
logger = logging.getLogger() logger = logging.getLogger()
import os import os
import re import re
import socket
import struct import struct
import subprocess import subprocess
import time import time
import hostapd import hostapd
from utils import HwsimSkip, fail_test, skip_with_fips from utils import HwsimSkip, fail_test, skip_with_fips, start_monitor, stop_monitor, radiotap_build
import hwsim_utils import hwsim_utils
from wpasupplicant import WpaSupplicant from wpasupplicant import WpaSupplicant
@ -1329,7 +1330,7 @@ def hapd_connected(hapd):
if ev is None: if ev is None:
raise Exception("Timeout on AP-STA-CONNECTED from hostapd") raise Exception("Timeout on AP-STA-CONNECTED from hostapd")
def eapol_test(apdev, dev, wpa2=True): def eapol_test(apdev, dev, wpa2=True, ieee80211w=0):
bssid = apdev['bssid'] bssid = apdev['bssid']
if wpa2: if wpa2:
ssid = "test-wpa2-psk" ssid = "test-wpa2-psk"
@ -1342,13 +1343,18 @@ def eapol_test(apdev, dev, wpa2=True):
else: else:
params = hostapd.wpa_params(ssid=ssid) params = hostapd.wpa_params(ssid=ssid)
params['wpa_psk'] = psk params['wpa_psk'] = psk
params['ieee80211w'] = str(ieee80211w)
hapd = hostapd.add_ap(apdev, params) hapd = hostapd.add_ap(apdev, params)
hapd.request("SET ext_eapol_frame_io 1") hapd.request("SET ext_eapol_frame_io 1")
dev.request("SET ext_eapol_frame_io 1") dev.request("SET ext_eapol_frame_io 1")
dev.connect(ssid, raw_psk=psk, scan_freq="2412", wait_connect=False) dev.connect(ssid, raw_psk=psk, scan_freq="2412", wait_connect=False,
ieee80211w=str(ieee80211w))
addr = dev.p2p_interface_addr() addr = dev.p2p_interface_addr()
if wpa2: if wpa2:
rsne = binascii.unhexlify('30140100000fac040100000fac040100000fac020000') if ieee80211w == 2:
rsne = binascii.unhexlify('30140100000fac040100000fac040100000fac02cc00')
else:
rsne = binascii.unhexlify('30140100000fac040100000fac040100000fac020000')
else: else:
rsne = binascii.unhexlify('dd160050f20101000050f20201000050f20201000050f202') rsne = binascii.unhexlify('dd160050f20101000050f20201000050f20201000050f202')
snonce = binascii.unhexlify('1111111111111111111111111111111111111111111111111111111111111111') snonce = binascii.unhexlify('1111111111111111111111111111111111111111111111111111111111111111')
@ -2321,6 +2327,87 @@ def test_ap_wpa2_psk_supp_proto_gtk_not_encrypted(dev, apdev):
raise Exception("Unencrypted GTK KDE not reported") raise Exception("Unencrypted GTK KDE not reported")
dev[0].wait_disconnected(timeout=1) dev[0].wait_disconnected(timeout=1)
def run_psk_supp_proto_pmf2(dev, apdev, igtk_kde=None, fail=False):
(bssid, ssid, hapd, snonce, pmk, addr, rsne) = eapol_test(apdev[0], dev[0],
ieee80211w=2)
# Wait for EAPOL-Key msg 1/4 from hostapd to determine when associated
msg = recv_eapol(hapd)
dev[0].dump_monitor()
# Build own EAPOL-Key msg 1/4
anonce = binascii.unhexlify('2222222222222222222222222222222222222222222222222222222222222222')
counter = 1
msg = build_eapol_key_1_4(anonce, replay_counter=counter)
counter += 1
send_eapol(dev[0], bssid, build_eapol(msg))
msg = recv_eapol(dev[0])
snonce = msg['rsn_key_nonce']
(ptk, kck, kek) = pmk_to_ptk(pmk, addr, bssid, snonce, anonce)
logger.debug("EAPOL-Key msg 3/4")
dev[0].dump_monitor()
gtk_kde = binascii.unhexlify('dd16000fac010100dc11188831bf4aa4a8678d2b41498618')
plain = rsne + gtk_kde
if igtk_kde:
plain += igtk_kde
wrapped = aes_wrap(kek, pad_key_data(plain))
msg = build_eapol_key_3_4(anonce, kck, wrapped, replay_counter=counter)
counter += 1
send_eapol(dev[0], bssid, build_eapol(msg))
if fail:
dev[0].wait_disconnected(timeout=1)
return
dev[0].wait_connected(timeout=1)
# Verify that an unprotected broadcast Deauthentication frame is ignored
bssid = binascii.unhexlify(hapd.own_addr().replace(':', ''))
sock = start_monitor(apdev[1]["ifname"])
radiotap = radiotap_build()
frame = binascii.unhexlify("c0003a01")
frame += 6*b'\xff' + bssid + bssid
frame += binascii.unhexlify("1000" + "0300")
sock.send(radiotap + frame)
# And same with incorrect BIP protection
for keyid in ["0400", "0500", "0600", "0004", "0005", "0006", "ffff"]:
frame2 = frame + binascii.unhexlify("4c10" + keyid + "010000000000c0e5ca5f2b3b4de9")
sock.send(radiotap + frame2)
ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=0.5)
if ev is not None:
raise Exception("Unexpected disconnection")
def run_psk_supp_proto_pmf(dev, apdev, igtk_kde=None, fail=False):
try:
run_psk_supp_proto_pmf2(dev, apdev, igtk_kde=igtk_kde, fail=fail)
finally:
stop_monitor(apdev[1]["ifname"])
def test_ap_wpa2_psk_supp_proto_no_igtk(dev, apdev):
"""WPA2-PSK supplicant protocol testing: no IGTK KDE"""
run_psk_supp_proto_pmf(dev, apdev, igtk_kde=None)
def test_ap_wpa2_psk_supp_proto_igtk_ok(dev, apdev):
"""WPA2-PSK supplicant protocol testing: valid IGTK KDE"""
igtk_kde = binascii.unhexlify('dd1c' + '000fac09' + '0400' + 6*'00' + 16*'77')
run_psk_supp_proto_pmf(dev, apdev, igtk_kde=igtk_kde)
def test_ap_wpa2_psk_supp_proto_igtk_keyid_swap(dev, apdev):
"""WPA2-PSK supplicant protocol testing: swapped IGTK KeyID"""
igtk_kde = binascii.unhexlify('dd1c' + '000fac09' + '0004' + 6*'00' + 16*'77')
run_psk_supp_proto_pmf(dev, apdev, igtk_kde=igtk_kde)
def test_ap_wpa2_psk_supp_proto_igtk_keyid_too_large(dev, apdev):
"""WPA2-PSK supplicant protocol testing: too large IGTK KeyID"""
igtk_kde = binascii.unhexlify('dd1c' + '000fac09' + 'ffff' + 6*'00' + 16*'77')
run_psk_supp_proto_pmf(dev, apdev, igtk_kde=igtk_kde, fail=True)
def test_ap_wpa2_psk_supp_proto_igtk_keyid_unexpected(dev, apdev):
"""WPA2-PSK supplicant protocol testing: unexpected IGTK KeyID"""
igtk_kde = binascii.unhexlify('dd1c' + '000fac09' + '0006' + 6*'00' + 16*'77')
run_psk_supp_proto_pmf(dev, apdev, igtk_kde=igtk_kde, fail=True)
def find_wpas_process(dev): def find_wpas_process(dev):
ifname = dev.ifname ifname = dev.ifname
err, data = dev.cmd_execute(['ps', 'ax']) err, data = dev.cmd_execute(['ps', 'ax'])

View file

@ -17,7 +17,7 @@ import subprocess
import hwsim_utils import hwsim_utils
import hostapd import hostapd
from wpasupplicant import WpaSupplicant from wpasupplicant import WpaSupplicant
from utils import HwsimSkip, alloc_fail, fail_test, wait_fail_trigger from utils import HwsimSkip, alloc_fail, fail_test, wait_fail_trigger, start_monitor, stop_monitor, radiotap_build
from test_ap_psk import find_wpas_process, read_process_memory, verify_not_present, get_key_locations from test_ap_psk import find_wpas_process, read_process_memory, verify_not_present, get_key_locations
@remote_compatible @remote_compatible
@ -1570,30 +1570,6 @@ def sae_rx_commit_token_req(sock, radiotap, send_two=False):
sock.send(radiotap + frame) sock.send(radiotap + frame)
return True return True
def radiotap_build():
radiotap_payload = struct.pack('BB', 0x08, 0)
radiotap_payload += struct.pack('BB', 0, 0)
radiotap_payload += struct.pack('BB', 0, 0)
radiotap_hdr = struct.pack('<BBHL', 0, 0, 8 + len(radiotap_payload),
0xc002)
return radiotap_hdr + radiotap_payload
def start_monitor(ifname, freq=2412):
subprocess.check_call(["iw", ifname, "set", "type", "monitor"])
subprocess.call(["ip", "link", "set", "dev", ifname, "up"])
subprocess.check_call(["iw", ifname, "set", "freq", str(freq)])
ETH_P_ALL = 3
sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW,
socket.htons(ETH_P_ALL))
sock.bind((ifname, 0))
sock.settimeout(0.5)
return sock
def stop_monitor(ifname):
subprocess.call(["ip", "link", "set", "dev", ifname, "down"])
subprocess.call(["iw", ifname, "set", "type", "managed"])
def run_sae_anti_clogging_during_attack(dev, apdev): def run_sae_anti_clogging_during_attack(dev, apdev):
if "SAE" not in dev[0].get_capability("auth_alg"): if "SAE" not in dev[0].get_capability("auth_alg"):
raise HwsimSkip("SAE not supported") raise HwsimSkip("SAE not supported")

View file

@ -6,6 +6,7 @@
import binascii import binascii
import os import os
import socket
import struct import struct
import subprocess import subprocess
import time import time
@ -162,3 +163,27 @@ def clear_regdom(hapd, dev, count=1):
clear_country(dev) clear_country(dev)
for i in range(count): for i in range(count):
dev[i].flush_scan_cache() dev[i].flush_scan_cache()
def radiotap_build():
radiotap_payload = struct.pack('BB', 0x08, 0)
radiotap_payload += struct.pack('BB', 0, 0)
radiotap_payload += struct.pack('BB', 0, 0)
radiotap_hdr = struct.pack('<BBHL', 0, 0, 8 + len(radiotap_payload),
0xc002)
return radiotap_hdr + radiotap_payload
def start_monitor(ifname, freq=2412):
subprocess.check_call(["iw", ifname, "set", "type", "monitor"])
subprocess.call(["ip", "link", "set", "dev", ifname, "up"])
subprocess.check_call(["iw", ifname, "set", "freq", str(freq)])
ETH_P_ALL = 3
sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW,
socket.htons(ETH_P_ALL))
sock.bind((ifname, 0))
sock.settimeout(0.5)
return sock
def stop_monitor(ifname):
subprocess.call(["ip", "link", "set", "dev", ifname, "down"])
subprocess.call(["iw", ifname, "set", "type", "managed"])