6734ba0c00
Perform detailed tests with OCV enabled, for both the 4-way and group key handshakes. These tests include establishing a working connection with OCV enabled, assuring that a STA without OCV enabled can still connect to a STA with OCV enabled (and vice versa), verifying that invalid OCI elements get silently ignored, verifying that missing OCI elements are reported, and so on. Signed-off-by: Mathy Vanhoef <Mathy.Vanhoef@cs.kuleuven.be>
864 lines
33 KiB
Python
864 lines
33 KiB
Python
# WPA2-Personal OCV tests
|
|
# Copyright (c) 2018, Mathy Vanhoef
|
|
#
|
|
# This software may be distributed under the terms of the BSD license.
|
|
# See README for more details
|
|
|
|
from remotehost import remote_compatible
|
|
import binascii, struct
|
|
import logging, time
|
|
logger = logging.getLogger()
|
|
|
|
import hostapd
|
|
from wpasupplicant import WpaSupplicant
|
|
import hwsim_utils
|
|
from utils import HwsimSkip
|
|
|
|
from test_ap_ht import set_world_reg
|
|
from test_ap_psk import parse_eapol, build_eapol, pmk_to_ptk, eapol_key_mic, recv_eapol, send_eapol, reply_eapol, hapd_connected, build_eapol_key_3_4, aes_wrap, pad_key_data
|
|
|
|
#TODO: Refuse setting up AP with OCV but without MFP support
|
|
#TODO: Refuse to connect to AP that advertises OCV but not MFP
|
|
|
|
def make_ocikde(op_class, channel, seg1_idx):
|
|
WLAN_EID_VENDOR_SPECIFIC = 221
|
|
RSN_KEY_DATA_OCI = "\x00\x0f\xac\x0d"
|
|
|
|
data = RSN_KEY_DATA_OCI + struct.pack("<BBB", op_class, channel, seg1_idx)
|
|
ocikde = struct.pack("<BB", WLAN_EID_VENDOR_SPECIFIC, len(data)) + data
|
|
|
|
return ocikde
|
|
|
|
def ocv_setup_ap(apdev, params):
|
|
ssid = "test-wpa2-ocv"
|
|
passphrase = "qwertyuiop"
|
|
params.update(hostapd.wpa2_params(ssid=ssid, passphrase=passphrase))
|
|
try:
|
|
hapd = hostapd.add_ap(apdev, params)
|
|
except Exception, e:
|
|
if "Failed to set hostapd parameter ocv" in str(e):
|
|
raise HwsimSkip("OCV not supported")
|
|
raise
|
|
return hapd, ssid, passphrase
|
|
|
|
def build_eapol_key_1_2(kck, key_data, replay_counter=3, key_info=0x1382,
|
|
extra_len=0, descr_type=2, key_len=16):
|
|
msg = {}
|
|
msg['version'] = 2
|
|
msg['type'] = 3
|
|
msg['length'] = 95 + len(key_data) + extra_len
|
|
|
|
msg['descr_type'] = descr_type
|
|
msg['rsn_key_info'] = key_info
|
|
msg['rsn_key_len'] = key_len
|
|
msg['rsn_replay_counter'] = struct.pack('>Q', replay_counter)
|
|
msg['rsn_key_nonce'] = binascii.unhexlify('0000000000000000000000000000000000000000000000000000000000000000')
|
|
msg['rsn_key_iv'] = binascii.unhexlify('00000000000000000000000000000000')
|
|
msg['rsn_key_rsc'] = binascii.unhexlify('0000000000000000')
|
|
msg['rsn_key_id'] = binascii.unhexlify('0000000000000000')
|
|
msg['rsn_key_data_len'] = len(key_data)
|
|
msg['rsn_key_data'] = key_data
|
|
eapol_key_mic(kck, msg)
|
|
return msg
|
|
|
|
def build_eapol_key_2_2(kck, key_data, replay_counter=3, key_info=0x0302,
|
|
extra_len=0, descr_type=2, key_len=16):
|
|
return build_eapol_key_1_2(kck, key_data, replay_counter, key_info,
|
|
extra_len, descr_type, key_len)
|
|
|
|
@remote_compatible
|
|
def test_wpa2_ocv(dev, apdev):
|
|
"""OCV on 2.4 GHz"""
|
|
params = { "channel": "1",
|
|
"ieee80211w": "2",
|
|
"ocv": "1" }
|
|
hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
|
|
for ocv in range(2):
|
|
dev[0].connect(ssid, psk=passphrase, scan_freq="2412", ocv=str(ocv),
|
|
ieee80211w="1")
|
|
dev[0].request("REMOVE_NETWORK all")
|
|
dev[0].wait_disconnected()
|
|
|
|
@remote_compatible
|
|
def test_wpa2_ocv_5ghz(dev, apdev):
|
|
"""OCV on 5 GHz"""
|
|
try:
|
|
run_wpa2_ocv_5ghz(dev, apdev)
|
|
finally:
|
|
set_world_reg(apdev[0], apdev[1], dev[0])
|
|
dev[0].flush_scan_cache()
|
|
|
|
def run_wpa2_ocv_5ghz(dev, apdev):
|
|
params = { "hw_mode": "a",
|
|
"channel": "40",
|
|
"ieee80211w": "2",
|
|
"country_code": "US",
|
|
"ocv": "1" }
|
|
hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
|
|
for ocv in range(2):
|
|
dev[0].connect(ssid, psk=passphrase, scan_freq="5200", ocv=str(ocv),
|
|
ieee80211w="1")
|
|
dev[0].request("REMOVE_NETWORK all")
|
|
dev[0].wait_disconnected()
|
|
|
|
@remote_compatible
|
|
def test_wpa2_ocv_ht20(dev, apdev):
|
|
"""OCV with HT20 channel"""
|
|
params = { "channel": "6",
|
|
"ieee80211n": "1",
|
|
"ieee80211w": "1",
|
|
"ocv": "1" }
|
|
hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
|
|
for ocv in range(2):
|
|
dev[0].connect(ssid, psk=passphrase, scan_freq="2437", ocv=str(ocv),
|
|
ieee80211w="1", disable_ht="1")
|
|
dev[1].connect(ssid, psk=passphrase, scan_freq="2437", ocv=str(ocv),
|
|
ieee80211w="1")
|
|
dev[0].request("REMOVE_NETWORK all")
|
|
dev[1].request("REMOVE_NETWORK all")
|
|
dev[0].wait_disconnected()
|
|
dev[1].wait_disconnected()
|
|
|
|
@remote_compatible
|
|
def test_wpa2_ocv_ht40(dev, apdev):
|
|
"""OCV with HT40 channel"""
|
|
try:
|
|
run_wpa2_ocv_ht40(dev, apdev)
|
|
finally:
|
|
set_world_reg(apdev[0], apdev[1], dev[0])
|
|
dev[0].flush_scan_cache()
|
|
dev[1].flush_scan_cache()
|
|
|
|
def run_wpa2_ocv_ht40(dev, apdev):
|
|
for channel, capab, freq, mode in [( "6", "[HT40-]", "2437", "g"),
|
|
( "6", "[HT40+]", "2437", "g"),
|
|
("40", "[HT40-]", "5200", "a"),
|
|
("36", "[HT40+]", "5180", "a")]:
|
|
params = { "hw_mode": mode,
|
|
"channel": channel,
|
|
"country_code": "US",
|
|
"ieee80211n": "1",
|
|
"ht_capab": capab,
|
|
"ieee80211w": "1",
|
|
"ocv": "1" }
|
|
hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
|
|
dev[0].flush_scan_cache()
|
|
dev[1].flush_scan_cache()
|
|
for ocv in range(2):
|
|
dev[0].connect(ssid, psk=passphrase, scan_freq=freq, ocv=str(ocv),
|
|
ieee80211w="1", disable_ht="1")
|
|
dev[1].connect(ssid, psk=passphrase, scan_freq=freq, ocv=str(ocv),
|
|
ieee80211w="1")
|
|
dev[0].request("REMOVE_NETWORK all")
|
|
dev[1].request("REMOVE_NETWORK all")
|
|
dev[0].wait_disconnected()
|
|
dev[1].wait_disconnected()
|
|
hapd.disable()
|
|
|
|
@remote_compatible
|
|
def test_wpa2_ocv_vht40(dev, apdev):
|
|
"""OCV with VHT40 channel"""
|
|
try:
|
|
run_wpa2_ocv_vht40(dev, apdev)
|
|
finally:
|
|
set_world_reg(apdev[0], apdev[1], dev[0])
|
|
dev[0].flush_scan_cache()
|
|
dev[1].flush_scan_cache()
|
|
dev[2].flush_scan_cache()
|
|
|
|
def run_wpa2_ocv_vht40(dev, apdev):
|
|
for channel, capab, freq in [("40", "[HT40-]", "5200"),
|
|
("36", "[HT40+]", "5180")]:
|
|
params = { "hw_mode": "a",
|
|
"channel": channel,
|
|
"country_code": "US",
|
|
"ht_capab": capab,
|
|
"ieee80211n": "1",
|
|
"ieee80211ac": "1",
|
|
"vht_oper_chwidth": "0",
|
|
"vht_oper_centr_freq_seg0_idx": "38",
|
|
"ieee80211w": "1",
|
|
"ocv": "1" }
|
|
hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
|
|
dev[0].flush_scan_cache()
|
|
dev[1].flush_scan_cache()
|
|
dev[2].flush_scan_cache()
|
|
for ocv in range(2):
|
|
dev[0].connect(ssid, psk=passphrase, scan_freq=freq, ocv=str(ocv),
|
|
ieee80211w="1", disable_ht="1")
|
|
dev[1].connect(ssid, psk=passphrase, scan_freq=freq, ocv=str(ocv),
|
|
ieee80211w="1", disable_vht="1")
|
|
dev[2].connect(ssid, psk=passphrase, scan_freq=freq, ocv=str(ocv),
|
|
ieee80211w="1")
|
|
dev[0].request("REMOVE_NETWORK all")
|
|
dev[1].request("REMOVE_NETWORK all")
|
|
dev[2].request("REMOVE_NETWORK all")
|
|
dev[0].wait_disconnected()
|
|
dev[1].wait_disconnected()
|
|
dev[2].wait_disconnected()
|
|
hapd.disable()
|
|
|
|
@remote_compatible
|
|
def test_wpa2_ocv_vht80(dev, apdev):
|
|
"""OCV with VHT80 channel"""
|
|
try:
|
|
run_wpa2_ocv_vht80(dev, apdev)
|
|
finally:
|
|
set_world_reg(apdev[0], apdev[1], dev[0])
|
|
dev[0].flush_scan_cache()
|
|
dev[1].flush_scan_cache()
|
|
dev[2].flush_scan_cache()
|
|
|
|
def run_wpa2_ocv_vht80(dev, apdev):
|
|
for channel, capab, freq in [("40", "[HT40-]", "5200"),
|
|
("36", "[HT40+]", "5180")]:
|
|
params = { "hw_mode": "a",
|
|
"channel": channel,
|
|
"country_code": "US",
|
|
"ht_capab": capab,
|
|
"ieee80211n": "1",
|
|
"ieee80211ac": "1",
|
|
"vht_oper_chwidth": "1",
|
|
"vht_oper_centr_freq_seg0_idx": "42",
|
|
"ieee80211w": "1",
|
|
"ocv": "1" }
|
|
hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
|
|
for ocv in range(2):
|
|
dev[0].connect(ssid, psk=passphrase, scan_freq=freq, ocv=str(ocv),
|
|
ieee80211w="1", disable_ht="1")
|
|
dev[1].connect(ssid, psk=passphrase, scan_freq=freq, ocv=str(ocv),
|
|
ieee80211w="1", disable_vht="1")
|
|
dev[2].connect(ssid, psk=passphrase, scan_freq=freq, ocv=str(ocv),
|
|
ieee80211w="1")
|
|
dev[0].request("REMOVE_NETWORK all")
|
|
dev[1].request("REMOVE_NETWORK all")
|
|
dev[2].request("REMOVE_NETWORK all")
|
|
dev[0].wait_disconnected()
|
|
dev[1].wait_disconnected()
|
|
dev[2].wait_disconnected()
|
|
hapd.disable()
|
|
|
|
@remote_compatible
|
|
def test_wpa2_ocv_vht160(dev, apdev):
|
|
"""OCV with VHT160 channel"""
|
|
try:
|
|
run_wpa2_ocv_vht160(dev, apdev)
|
|
finally:
|
|
set_world_reg(apdev[0], apdev[1], dev[0])
|
|
dev[0].flush_scan_cache()
|
|
dev[1].flush_scan_cache()
|
|
dev[2].flush_scan_cache()
|
|
|
|
def run_wpa2_ocv_vht160(dev, apdev):
|
|
for channel, capab, freq in [("100", "[HT40+]", "5500"),
|
|
("104", "[HT40-]", "5520")]:
|
|
params = { "hw_mode": "a",
|
|
"channel": channel,
|
|
"country_code": "ZA",
|
|
"ht_capab": capab,
|
|
"ieee80211n": "1",
|
|
"ieee80211ac": "1",
|
|
"vht_oper_chwidth": "2",
|
|
"vht_oper_centr_freq_seg0_idx": "114",
|
|
"ieee80211w": "1",
|
|
"ocv": "1" }
|
|
hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
|
|
for ocv in range(2):
|
|
dev[0].connect(ssid, psk=passphrase, scan_freq=freq, ocv=str(ocv),
|
|
ieee80211w="1", disable_ht="1")
|
|
dev[1].connect(ssid, psk=passphrase, scan_freq=freq, ocv=str(ocv),
|
|
ieee80211w="1", disable_vht="1")
|
|
dev[2].connect(ssid, psk=passphrase, scan_freq=freq, ocv=str(ocv),
|
|
ieee80211w="1")
|
|
dev[0].request("REMOVE_NETWORK all")
|
|
dev[1].request("REMOVE_NETWORK all")
|
|
dev[2].request("REMOVE_NETWORK all")
|
|
dev[0].wait_disconnected()
|
|
dev[1].wait_disconnected()
|
|
dev[2].wait_disconnected()
|
|
hapd.disable()
|
|
|
|
@remote_compatible
|
|
def test_wpa2_ocv_vht80plus80(dev, apdev):
|
|
"""OCV with VHT80+80 channel"""
|
|
try:
|
|
run_wpa2_ocv_vht80plus80(dev, apdev)
|
|
finally:
|
|
set_world_reg(apdev[0], apdev[1], dev[0])
|
|
dev[0].flush_scan_cache()
|
|
dev[1].flush_scan_cache()
|
|
dev[2].flush_scan_cache()
|
|
|
|
def run_wpa2_ocv_vht80plus80(dev, apdev):
|
|
for channel, capab, freq in [("36", "[HT40+]", "5180"),
|
|
("40", "[HT40-]", "5200")]:
|
|
params = { "hw_mode": "a",
|
|
"channel": channel,
|
|
"country_code": "US",
|
|
"ht_capab": capab,
|
|
"ieee80211n": "1",
|
|
"ieee80211ac": "1",
|
|
"vht_oper_chwidth": "3",
|
|
"vht_oper_centr_freq_seg0_idx": "42",
|
|
"vht_oper_centr_freq_seg1_idx": "155",
|
|
"ieee80211w": "1",
|
|
"ieee80211d": "1",
|
|
"ieee80211h": "1",
|
|
"ocv": "1" }
|
|
hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
|
|
for ocv in range(2):
|
|
dev[0].connect(ssid, psk=passphrase, scan_freq=freq, ocv=str(ocv),
|
|
ieee80211w="1", disable_ht="1")
|
|
dev[1].connect(ssid, psk=passphrase, scan_freq=freq, ocv=str(ocv),
|
|
ieee80211w="1", disable_vht="1")
|
|
dev[2].connect(ssid, psk=passphrase, scan_freq=freq, ocv=str(ocv),
|
|
ieee80211w="1")
|
|
dev[0].request("REMOVE_NETWORK all")
|
|
dev[1].request("REMOVE_NETWORK all")
|
|
dev[2].request("REMOVE_NETWORK all")
|
|
dev[0].wait_disconnected()
|
|
dev[1].wait_disconnected()
|
|
dev[2].wait_disconnected()
|
|
hapd.disable()
|
|
|
|
class APConnection:
|
|
def init_params(self):
|
|
# Static parameters
|
|
self.ssid = "test-wpa2-ocv"
|
|
self.passphrase = "qwertyuiop"
|
|
self.psk = "c2c6c255af836bed1b3f2f1ded98e052f5ad618bb554e2836757b55854a0eab7"
|
|
|
|
# Dynamic parameters
|
|
self.hapd = None
|
|
self.addr = None
|
|
self.rsne = None
|
|
self.kck = None
|
|
self.kek = None
|
|
self.msg = None
|
|
self.bssid = None
|
|
self.anonce = None
|
|
self.snonce = None
|
|
|
|
def __init__(self, apdev, dev, params):
|
|
self.init_params()
|
|
|
|
# By default, OCV is enabled for both the client and AP. The following
|
|
# parameters can be used to disable OCV for the client or AP.
|
|
ap_ocv = params.pop("ap_ocv", "1")
|
|
sta_ocv = params.pop("sta_ocv", "1")
|
|
|
|
freq = params.pop("freq")
|
|
params.update(hostapd.wpa2_params(ssid=self.ssid,
|
|
passphrase=self.passphrase))
|
|
params["wpa_pairwise_update_count"] = "10"
|
|
params["ocv"] = ap_ocv
|
|
try:
|
|
self.hapd = hostapd.add_ap(apdev, params)
|
|
except Exception, e:
|
|
if "Failed to set hostapd parameter ocv" in str(e):
|
|
raise HwsimSkip("OCV not supported")
|
|
raise
|
|
self.hapd.request("SET ext_eapol_frame_io 1")
|
|
dev.request("SET ext_eapol_frame_io 1")
|
|
|
|
self.bssid = apdev['bssid']
|
|
pmk = binascii.unhexlify("c2c6c255af836bed1b3f2f1ded98e052f5ad618bb554e2836757b55854a0eab7")
|
|
|
|
if sta_ocv != "0":
|
|
self.rsne = binascii.unhexlify("301a0100000fac040100000fac040100000fac0280400000000fac06")
|
|
else:
|
|
self.rsne = binascii.unhexlify("301a0100000fac040100000fac040100000fac0280000000000fac06")
|
|
self.snonce = binascii.unhexlify('1111111111111111111111111111111111111111111111111111111111111111')
|
|
|
|
dev.connect(self.ssid, raw_psk=self.psk, scan_freq=freq, ocv=sta_ocv,
|
|
ieee80211w="1", wait_connect=False)
|
|
self.addr = dev.p2p_interface_addr()
|
|
|
|
# Wait for EAPOL-Key msg 1/4 from hostapd to determine when associated
|
|
self.msg = recv_eapol(self.hapd)
|
|
self.anonce = self.msg['rsn_key_nonce']
|
|
(ptk, self.kck, self.kek) = pmk_to_ptk(pmk, self.addr, self.bssid,
|
|
self.snonce,self.anonce)
|
|
|
|
# hapd, addr, rsne, kck, msg, anonce, snonce
|
|
def test_bad_oci(self, logmsg, op_class, channel, seg1_idx):
|
|
logger.debug("Bad OCI element: " + logmsg)
|
|
if op_class is None:
|
|
ocikde = ""
|
|
else:
|
|
ocikde = make_ocikde(op_class, channel, seg1_idx)
|
|
|
|
reply_eapol("2/4", self.hapd, self.addr, self.msg, 0x010a, self.snonce,
|
|
self.rsne + ocikde, self.kck)
|
|
self.msg = recv_eapol(self.hapd)
|
|
if self.anonce != self.msg['rsn_key_nonce'] or self.msg["rsn_key_info"] != 138:
|
|
raise Exception("Didn't receive retransmitted 1/4")
|
|
|
|
def confirm_valid_oci(self, op_class, channel, seg1_idx):
|
|
logger.debug("Valid OCI element to complete handshake")
|
|
ocikde = make_ocikde(op_class, channel, seg1_idx)
|
|
|
|
reply_eapol("2/4", self.hapd, self.addr, self.msg, 0x010a, self.snonce,
|
|
self.rsne + ocikde, self.kck)
|
|
self.msg = recv_eapol(self.hapd)
|
|
if self.anonce != self.msg['rsn_key_nonce'] or self.msg["rsn_key_info"] != 5066:
|
|
raise Exception("Didn't receive 3/4 in response to valid 2/4")
|
|
|
|
reply_eapol("4/4", self.hapd, self.addr, self.msg, 0x030a, None, None,
|
|
self.kck)
|
|
hapd_connected(self.hapd)
|
|
|
|
@remote_compatible
|
|
def test_wpa2_ocv_ap_mismatch(dev, apdev):
|
|
"""OCV AP mismatch"""
|
|
params = { "channel": "1",
|
|
"ieee80211w": "1",
|
|
"freq": "2412" }
|
|
conn = APConnection(apdev[0], dev[0], params)
|
|
conn.test_bad_oci("element missing", None, 0, 0)
|
|
conn.test_bad_oci("wrong channel number", 81, 6, 0)
|
|
conn.test_bad_oci("invalid channel number", 81, 0, 0)
|
|
conn.test_bad_oci("wrong operating class", 80, 0, 0)
|
|
conn.test_bad_oci("invalid operating class", 0, 0, 0)
|
|
conn.confirm_valid_oci(81, 1, 0)
|
|
|
|
@remote_compatible
|
|
def test_wpa2_ocv_ap_ht_mismatch(dev, apdev):
|
|
"""OCV AP mismatch (HT)"""
|
|
params = { "channel": "6",
|
|
"ht_capab": "[HT40-]",
|
|
"ieee80211w": "1",
|
|
"freq": "2437" }
|
|
conn = APConnection(apdev[0], dev[0], params)
|
|
conn.test_bad_oci("wrong primary channel", 84, 5, 0)
|
|
conn.test_bad_oci("lower bandwidth than negotiated", 81, 6, 0)
|
|
conn.test_bad_oci("bad upper/lower channel", 83, 6, 0)
|
|
conn.confirm_valid_oci(84, 6, 0)
|
|
|
|
@remote_compatible
|
|
def test_wpa2_ocv_ap_vht80_mismatch(dev, apdev):
|
|
"""OCV AP mismatch (VHT80)"""
|
|
try:
|
|
run_wpa2_ocv_ap_vht80_mismatch(dev, apdev)
|
|
finally:
|
|
set_world_reg(apdev[0], apdev[1], dev[0])
|
|
dev[0].flush_scan_cache()
|
|
|
|
def run_wpa2_ocv_ap_vht80_mismatch(dev, apdev):
|
|
params = { "hw_mode": "a",
|
|
"channel": "36",
|
|
"country_code": "US",
|
|
"ht_capab": "[HT40+]",
|
|
"ieee80211w": "1",
|
|
"ieee80211n": "1",
|
|
"ieee80211ac": "1",
|
|
"vht_oper_chwidth": "1",
|
|
"freq": "5180",
|
|
"vht_oper_centr_freq_seg0_idx": "42" }
|
|
conn = APConnection(apdev[0], dev[0], params)
|
|
conn.test_bad_oci("wrong primary channel", 128, 38, 0)
|
|
conn.test_bad_oci("wrong primary channel", 128, 32, 0)
|
|
conn.test_bad_oci("smaller bandwidth than negotiated", 116, 36, 0)
|
|
conn.test_bad_oci("smaller bandwidth than negotiated", 115, 36, 0)
|
|
conn.confirm_valid_oci(128, 36, 0)
|
|
|
|
dev[0].dump_monitor()
|
|
dev[0].request("DISCONNECT")
|
|
dev[0].wait_disconnected()
|
|
|
|
@remote_compatible
|
|
def test_wpa2_ocv_ap_vht160_mismatch(dev, apdev):
|
|
"""OCV AP mismatch (VHT160)"""
|
|
try:
|
|
run_wpa2_ocv_ap_vht160_mismatch(dev, apdev)
|
|
finally:
|
|
set_world_reg(apdev[0], apdev[1], dev[0])
|
|
dev[0].flush_scan_cache()
|
|
|
|
def run_wpa2_ocv_ap_vht160_mismatch(dev, apdev):
|
|
params = { "hw_mode": "a",
|
|
"channel": "100",
|
|
"country_code": "ZA",
|
|
"ht_capab": "[HT40+]",
|
|
"ieee80211w": "1",
|
|
"ieee80211n": "1",
|
|
"ieee80211ac": "1",
|
|
"vht_oper_chwidth": "2",
|
|
"freq": "5500",
|
|
"vht_oper_centr_freq_seg0_idx": "114",
|
|
"ieee80211d": "1",
|
|
"ieee80211h": "1" }
|
|
conn = APConnection(apdev[0], dev[0], params)
|
|
conn.test_bad_oci("wrong primary channel", 129, 36, 0)
|
|
conn.test_bad_oci("wrong primary channel", 129, 114, 0)
|
|
conn.test_bad_oci("smaller bandwidth (20 Mhz) than negotiated", 121, 100, 0)
|
|
conn.test_bad_oci("smaller bandwidth (40 Mhz) than negotiated", 122, 100, 0)
|
|
conn.test_bad_oci("smaller bandwidth (80 Mhz) than negotiated", 128, 100, 0)
|
|
conn.test_bad_oci("using 80+80 channel instead of 160", 130, 100, 155)
|
|
conn.confirm_valid_oci(129, 100, 0)
|
|
|
|
dev[0].dump_monitor()
|
|
dev[0].request("DISCONNECT")
|
|
dev[0].wait_disconnected()
|
|
|
|
@remote_compatible
|
|
def test_wpa2_ocv_ap_vht80plus80_mismatch(dev, apdev):
|
|
"""OCV AP mismatch (VHT80+80)"""
|
|
try:
|
|
run_wpa2_ocv_ap_vht80plus80_mismatch(dev, apdev)
|
|
finally:
|
|
set_world_reg(apdev[0], apdev[1], dev[0])
|
|
dev[0].flush_scan_cache()
|
|
|
|
def run_wpa2_ocv_ap_vht80plus80_mismatch(dev, apdev):
|
|
params = { "hw_mode": "a",
|
|
"channel": "36",
|
|
"country_code": "US",
|
|
"ht_capab": "[HT40+]",
|
|
"ieee80211w": "1",
|
|
"ieee80211n": "1",
|
|
"ieee80211ac": "1",
|
|
"vht_oper_chwidth": "3",
|
|
"freq": "5180",
|
|
"vht_oper_centr_freq_seg0_idx": "42",
|
|
"ieee80211d": "1",
|
|
"vht_oper_centr_freq_seg1_idx": "155",
|
|
"ieee80211h": "1" }
|
|
conn = APConnection(apdev[0], dev[0], params)
|
|
conn.test_bad_oci("using 80 MHz operating class", 128, 36, 155)
|
|
conn.test_bad_oci("wrong frequency segment 1", 130, 36, 138)
|
|
conn.confirm_valid_oci(130, 36, 155)
|
|
|
|
dev[0].dump_monitor()
|
|
dev[0].request("DISCONNECT")
|
|
dev[0].wait_disconnected()
|
|
|
|
@remote_compatible
|
|
def test_wpa2_ocv_ap_unexpected1(dev, apdev):
|
|
"""OCV and unexpected OCI KDE from station"""
|
|
params = { "channel": "1",
|
|
"ieee80211w": "1",
|
|
"ap_ocv": "0",
|
|
"sta_ocv": "1",
|
|
"freq": "2412" }
|
|
conn = APConnection(apdev[0], dev[0], params)
|
|
logger.debug("Client will send OCI KDE even if it was not negotiated")
|
|
conn.confirm_valid_oci(81, 1, 0)
|
|
|
|
@remote_compatible
|
|
def test_wpa2_ocv_ap_unexpected2(dev, apdev):
|
|
"""OCV and unexpected OCI KDE from station"""
|
|
params = { "channel": "1",
|
|
"ieee80211w": "1",
|
|
"ap_ocv": "1",
|
|
"sta_ocv": "0",
|
|
"freq": "2412" }
|
|
conn = APConnection(apdev[0], dev[0], params)
|
|
logger.debug("Client will send OCI KDE even if it was not negotiated")
|
|
conn.confirm_valid_oci(81, 1, 0)
|
|
|
|
@remote_compatible
|
|
def test_wpa2_ocv_ap_retransmit_msg3(dev, apdev):
|
|
"""Verify that manually retransmitted msg 3/4 contain a correct OCI"""
|
|
bssid = apdev[0]['bssid']
|
|
ssid = "test-wpa2-ocv"
|
|
passphrase = "qwertyuiop"
|
|
psk = "c2c6c255af836bed1b3f2f1ded98e052f5ad618bb554e2836757b55854a0eab7"
|
|
params = hostapd.wpa2_params(ssid=ssid)
|
|
params["wpa_psk"] = psk
|
|
params["ieee80211w"] = "1"
|
|
params["ocv"] = "1"
|
|
params['wpa_disable_eapol_key_retries'] = "1"
|
|
try:
|
|
hapd = hostapd.add_ap(apdev[0], params)
|
|
except Exception, e:
|
|
if "Failed to set hostapd parameter ocv" in str(e):
|
|
raise HwsimSkip("OCV not supported")
|
|
raise
|
|
hapd.request("SET ext_eapol_frame_io 1")
|
|
dev[0].request("SET ext_eapol_frame_io 1")
|
|
dev[0].connect(ssid, psk=passphrase, scan_freq="2412", wait_connect=False,
|
|
ocv="1", ieee80211w="1")
|
|
addr = dev[0].own_addr()
|
|
|
|
# EAPOL-Key msg 1/4
|
|
ev = hapd.wait_event(["EAPOL-TX"], timeout=15)
|
|
if ev is None:
|
|
raise Exception("Timeout on EAPOL-TX from hostapd")
|
|
res = dev[0].request("EAPOL_RX " + bssid + " " + ev.split(' ')[2])
|
|
if "OK" not in res:
|
|
raise Exception("EAPOL_RX to wpa_supplicant failed")
|
|
|
|
# EAPOL-Key msg 2/4
|
|
ev = dev[0].wait_event(["EAPOL-TX"], timeout=15)
|
|
if ev is None:
|
|
raise Exception("Timeout on EAPOL-TX from wpa_supplicant")
|
|
res = hapd.request("EAPOL_RX " + addr + " " + ev.split(' ')[2])
|
|
if "OK" not in res:
|
|
raise Exception("EAPOL_RX to hostapd failed")
|
|
|
|
# EAPOL-Key msg 3/4
|
|
ev = hapd.wait_event(["EAPOL-TX"], timeout=15)
|
|
if ev is None:
|
|
raise Exception("Timeout on EAPOL-TX from hostapd")
|
|
logger.info("Drop the first EAPOL-Key msg 3/4")
|
|
|
|
# Use normal EAPOL TX/RX to handle retries.
|
|
hapd.request("SET ext_eapol_frame_io 0")
|
|
dev[0].request("SET ext_eapol_frame_io 0")
|
|
|
|
# Manually retransmit EAPOL-Key msg 3/4
|
|
if "OK" not in hapd.request("RESEND_M3 " + addr):
|
|
raise Exception("RESEND_M3 failed")
|
|
|
|
dev[0].wait_connected()
|
|
hwsim_utils.test_connectivity(dev[0], hapd)
|
|
|
|
def test_wpa2_ocv_ap_group_hs(dev, apdev):
|
|
"""OCV group handshake (AP)"""
|
|
params = { "channel": "1",
|
|
"ieee80211w": "1",
|
|
"freq": "2412",
|
|
"wpa_strict_rekey": "1" }
|
|
conn = APConnection(apdev[0], dev[0], params)
|
|
conn.confirm_valid_oci(81, 1, 0)
|
|
|
|
conn.hapd.request("SET ext_eapol_frame_io 0")
|
|
dev[1].connect(conn.ssid, psk=conn.passphrase, scan_freq="2412", ocv="1",
|
|
ieee80211w="1")
|
|
conn.hapd.request("SET ext_eapol_frame_io 1")
|
|
|
|
# Trigger a group key handshake
|
|
dev[1].request("DISCONNECT")
|
|
dev[0].dump_monitor()
|
|
|
|
# Wait for EAPOL-Key msg 1/2
|
|
conn.msg = recv_eapol(conn.hapd)
|
|
if conn.msg["rsn_key_info"] != 4994:
|
|
raise Exception("Didn't receive 1/2 of group key handshake")
|
|
|
|
# Send a EAPOL-Key msg 2/2 with a bad OCI
|
|
logger.info("Bad OCI element")
|
|
ocikde = make_ocikde(1, 1, 1)
|
|
msg = build_eapol_key_2_2(conn.kck, ocikde, replay_counter=3)
|
|
conn.hapd.dump_monitor()
|
|
send_eapol(conn.hapd, conn.addr, build_eapol(msg))
|
|
|
|
# Wait for retransmitted EAPOL-Key msg 1/2
|
|
conn.msg = recv_eapol(conn.hapd)
|
|
if conn.msg["rsn_key_info"] != 4994:
|
|
raise Exception("Didn't receive 1/2 of group key handshake")
|
|
|
|
# Send a EAPOL-Key msg 2/2 with a good OCI
|
|
logger.info("Good OCI element")
|
|
ocikde = make_ocikde(81, 1, 0)
|
|
msg = build_eapol_key_2_2(conn.kck, ocikde, replay_counter=4)
|
|
conn.hapd.dump_monitor()
|
|
send_eapol(conn.hapd, conn.addr, build_eapol(msg))
|
|
|
|
# Verify that group key handshake has completed
|
|
ev = conn.hapd.wait_event(["EAPOL-TX"], timeout=1)
|
|
if ev is not None:
|
|
eapol = binascii.unhexlify(ev.split(' ')[2])
|
|
msg = parse_eapol(eapol)
|
|
if msg["rsn_key_info"] == 4994:
|
|
raise Exception("AP didn't accept 2/2 of group key handshake")
|
|
|
|
class STAConnection:
|
|
def init_params(self):
|
|
# Static parameters
|
|
self.ssid = "test-wpa2-ocv"
|
|
self.passphrase = "qwertyuiop"
|
|
self.psk = "c2c6c255af836bed1b3f2f1ded98e052f5ad618bb554e2836757b55854a0eab7"
|
|
|
|
# Dynamic parameters
|
|
self.hapd = None
|
|
self.dev = None
|
|
self.addr = None
|
|
self.rsne = None
|
|
self.kck = None
|
|
self.kek = None
|
|
self.msg = None
|
|
self.bssid = None
|
|
self.anonce = None
|
|
self.snonce = None
|
|
self.gtkie = None
|
|
self.counter = None
|
|
|
|
def __init__(self, apdev, dev, params, sta_params=None):
|
|
self.init_params()
|
|
self.dev = dev
|
|
self.bssid = apdev['bssid']
|
|
|
|
freq = params.pop("freq")
|
|
if sta_params is None:
|
|
sta_params = dict()
|
|
if not "ocv" in sta_params:
|
|
sta_params["ocv"] = "1"
|
|
if not "ieee80211w" in sta_params:
|
|
sta_params["ieee80211w"] = "1"
|
|
|
|
params.update(hostapd.wpa2_params(ssid=self.ssid,
|
|
passphrase=self.passphrase))
|
|
params['wpa_pairwise_update_count'] = "10"
|
|
|
|
try:
|
|
self.hapd = hostapd.add_ap(apdev, params)
|
|
except Exception, e:
|
|
if "Failed to set hostapd parameter ocv" in str(e):
|
|
raise HwsimSkip("OCV not supported")
|
|
raise
|
|
self.hapd.request("SET ext_eapol_frame_io 1")
|
|
self.dev.request("SET ext_eapol_frame_io 1")
|
|
pmk = binascii.unhexlify("c2c6c255af836bed1b3f2f1ded98e052f5ad618bb554e2836757b55854a0eab7")
|
|
|
|
self.gtkie = binascii.unhexlify("dd16000fac010100dc11188831bf4aa4a8678d2b41498618")
|
|
if sta_params["ocv"] != "0":
|
|
self.rsne = binascii.unhexlify("30140100000fac040100000fac040100000fac028c40")
|
|
else:
|
|
self.rsne = binascii.unhexlify("30140100000fac040100000fac040100000fac028c00")
|
|
|
|
self.dev.connect(self.ssid, raw_psk=self.psk, scan_freq=freq,
|
|
wait_connect=False, **sta_params)
|
|
self.addr = dev.p2p_interface_addr()
|
|
|
|
# Forward msg 1/4 from AP to STA
|
|
self.msg = recv_eapol(self.hapd)
|
|
self.anonce = self.msg['rsn_key_nonce']
|
|
send_eapol(self.dev, self.bssid, build_eapol(self.msg))
|
|
|
|
# Capture msg 2/4 from the STA so we can derive the session keys
|
|
self.msg = recv_eapol(dev)
|
|
self.snonce = self.msg['rsn_key_nonce']
|
|
(ptk, self.kck, self.kek) = pmk_to_ptk(pmk, self.addr, self.bssid,
|
|
self.snonce,self.anonce)
|
|
|
|
self.counter = struct.unpack('>Q',
|
|
self.msg['rsn_replay_counter'])[0] + 1
|
|
|
|
def test_bad_oci(self, logmsg, op_class, channel, seg1_idx, errmsg):
|
|
logger.info("Bad OCI element: " + logmsg)
|
|
if op_class is None:
|
|
ocikde = ""
|
|
else:
|
|
ocikde = make_ocikde(op_class, channel, seg1_idx)
|
|
|
|
plain = self.rsne + self.gtkie + ocikde
|
|
wrapped = aes_wrap(self.kek, pad_key_data(plain))
|
|
msg = build_eapol_key_3_4(self.anonce, self.kck, wrapped,
|
|
replay_counter=self.counter)
|
|
|
|
self.dev.dump_monitor()
|
|
send_eapol(self.dev, self.bssid, build_eapol(msg))
|
|
self.counter += 1
|
|
|
|
ev = self.dev.wait_event([errmsg], timeout=5)
|
|
if ev is None:
|
|
raise Exception("Bad OCI not reported")
|
|
|
|
def confirm_valid_oci(self, op_class, channel, seg1_idx):
|
|
logger.debug("Valid OCI element to complete handshake")
|
|
ocikde = make_ocikde(op_class, channel, seg1_idx)
|
|
|
|
plain = self.rsne + self.gtkie + ocikde
|
|
wrapped = aes_wrap(self.kek, pad_key_data(plain))
|
|
msg = build_eapol_key_3_4(self.anonce, self.kck, wrapped,
|
|
replay_counter=self.counter)
|
|
|
|
self.dev.dump_monitor()
|
|
send_eapol(self.dev, self.bssid, build_eapol(msg))
|
|
self.counter += 1
|
|
|
|
self.dev.wait_connected(timeout=1)
|
|
|
|
@remote_compatible
|
|
def test_wpa2_ocv_mismatch_client(dev, apdev):
|
|
"""OCV client mismatch"""
|
|
params = { "channel": "1",
|
|
"ieee80211w": "1",
|
|
"ocv": "1",
|
|
"freq": "2412" }
|
|
conn = STAConnection(apdev[0], dev[0], params)
|
|
conn.test_bad_oci("element missing", None, 0, 0,
|
|
"did not receive mandatory OCI")
|
|
conn.test_bad_oci("wrong channel number", 81, 6, 0,
|
|
"primary channel mismatch")
|
|
conn.test_bad_oci("invalid channel number", 81, 0, 0,
|
|
"unable to interpret received OCI")
|
|
conn.test_bad_oci("wrong operating class", 80, 0, 0,
|
|
"unable to interpret received OCI")
|
|
conn.test_bad_oci("invalid operating class", 0, 0, 0,
|
|
"unable to interpret received OCI")
|
|
conn.confirm_valid_oci(81, 1, 0)
|
|
|
|
@remote_compatible
|
|
def test_wpa2_ocv_vht160_mismatch_client(dev, apdev):
|
|
"""OCV client mismatch (VHT160)"""
|
|
try:
|
|
run_wpa2_ocv_vht160_mismatch_client(dev, apdev)
|
|
finally:
|
|
set_world_reg(apdev[0], apdev[1], dev[0])
|
|
dev[0].flush_scan_cache()
|
|
|
|
def run_wpa2_ocv_vht160_mismatch_client(dev, apdev):
|
|
params = { "hw_mode": "a",
|
|
"channel": "100",
|
|
"country_code": "ZA",
|
|
"ht_capab": "[HT40+]",
|
|
"ieee80211w": "1",
|
|
"ieee80211n": "1",
|
|
"ieee80211ac": "1",
|
|
"vht_oper_chwidth": "2",
|
|
"ocv": "1",
|
|
"vht_oper_centr_freq_seg0_idx": "114",
|
|
"freq": "5500",
|
|
"ieee80211d": "1",
|
|
"ieee80211h": "1" }
|
|
sta_params = { "disable_vht": "1" }
|
|
conn = STAConnection(apdev[0], dev[0], params, sta_params)
|
|
conn.test_bad_oci("smaller bandwidth (20 Mhz) than negotiated",
|
|
121, 100, 0, "channel bandwidth mismatch")
|
|
conn.test_bad_oci("wrong frequency, bandwith, and secondary channel",
|
|
123, 104, 0, "primary channel mismatch")
|
|
conn.test_bad_oci("wrong upper/lower behaviour",
|
|
129, 104, 0, "primary channel mismatch")
|
|
conn.confirm_valid_oci(122, 100, 0)
|
|
|
|
def test_wpa2_ocv_sta_group_hs(dev, apdev):
|
|
"""OCV group handshake (STA)"""
|
|
params = { "channel": "1",
|
|
"ieee80211w": "1",
|
|
"ocv": "1",
|
|
"freq": "2412",
|
|
"wpa_strict_rekey": "1" }
|
|
conn = STAConnection(apdev[0], dev[0], params.copy())
|
|
conn.confirm_valid_oci(81, 1, 0)
|
|
|
|
# Send a EAPOL-Key msg 1/2 with a bad OCI
|
|
logger.info("Bad OCI element")
|
|
plain = conn.gtkie + make_ocikde(1, 1, 1)
|
|
wrapped = aes_wrap(conn.kek, pad_key_data(plain))
|
|
msg = build_eapol_key_1_2(conn.kck, wrapped, replay_counter=3)
|
|
send_eapol(dev[0], conn.bssid, build_eapol(msg))
|
|
|
|
# We shouldn't get a EAPOL-Key message back
|
|
ev = dev[0].wait_event(["EAPOL-TX"], timeout=1)
|
|
if ev is not None:
|
|
raise Exception("Received response to invalid EAPOL-Key 1/2")
|
|
|
|
# Reset AP to try with valid OCI
|
|
conn.hapd.disable()
|
|
conn = STAConnection(apdev[0], dev[0], params.copy())
|
|
conn.confirm_valid_oci(81, 1, 0)
|
|
|
|
# Send a EAPOL-Key msg 1/2 with a good OCI
|
|
logger.info("Good OCI element")
|
|
plain = conn.gtkie + make_ocikde(81, 1, 0)
|
|
wrapped = aes_wrap(conn.kek, pad_key_data(plain))
|
|
msg = build_eapol_key_1_2(conn.kck, wrapped, replay_counter=4)
|
|
send_eapol(dev[0], conn.bssid, build_eapol(msg))
|
|
|
|
# Wait for EAPOL-Key msg 2/2
|
|
conn.msg = recv_eapol(dev[0])
|
|
if conn.msg["rsn_key_info"] != 0x0302:
|
|
raise Exception("Didn't receive 2/2 of group key handshake")
|