tests: Add test cases for PMF

Signed-hostap: Jouni Malinen <j@w1.fi>
This commit is contained in:
Jouni Malinen 2013-03-30 13:47:22 +02:00
parent 2f9b66d3ab
commit 835a546b20
5 changed files with 215 additions and 30 deletions

View file

@ -103,7 +103,7 @@ def add_ap(ifname, params):
raise Exception("Could not ping hostapd") raise Exception("Could not ping hostapd")
hapd.set_defaults() hapd.set_defaults()
fields = [ "ssid", "wpa_passphrase", "wpa", "wpa_key_mgmt", fields = [ "ssid", "wpa_passphrase", "wpa", "wpa_key_mgmt",
"wpa_pairwise", "rsn_pairwise", "wep_key0" ] "wpa_pairwise", "rsn_pairwise", "ieee80211w", "wep_key0" ]
for field in fields: for field in fields:
if field in params: if field in params:
hapd.set(field, params[field]) hapd.set(field, params[field])

View file

@ -0,0 +1,93 @@
#!/usr/bin/python
#
# Protected management frames tests
# Copyright (c) 2013, Jouni Malinen <j@w1.fi>
#
# This software may be distributed under the terms of the BSD license.
# See README for more details.
import time
import subprocess
import logging
logger = logging.getLogger(__name__)
import hwsim_utils
import hostapd
from wlantest import Wlantest
ap_ifname = 'wlan2'
bssid = "02:00:00:00:02:00"
def test_ap_pmf_required(dev):
"""WPA2-PSK AP with PMF required"""
ssid = "test-pmf-required"
wt = Wlantest()
wt.flush()
wt.add_passphrase("12345678")
params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
params["wpa_key_mgmt"] = "WPA-PSK-SHA256";
params["ieee80211w"] = "2";
hostapd.add_ap(ap_ifname, params)
dev[0].connect(ssid, psk="12345678", ieee80211w="1", key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2")
hwsim_utils.test_connectivity(dev[0].ifname, ap_ifname)
dev[1].connect(ssid, psk="12345678", ieee80211w="2", key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2")
hwsim_utils.test_connectivity(dev[1].ifname, ap_ifname)
wt.require_ap_pmf_mandatory(bssid)
wt.require_sta_pmf(bssid, dev[0].p2p_interface_addr())
wt.require_sta_pmf_mandatory(bssid, dev[1].p2p_interface_addr())
def test_ap_pmf_optional(dev):
"""WPA2-PSK AP with PMF optional"""
ssid = "test-pmf-optional"
wt = Wlantest()
wt.flush()
wt.add_passphrase("12345678")
params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
params["wpa_key_mgmt"] = "WPA-PSK";
params["ieee80211w"] = "1";
hostapd.add_ap(ap_ifname, params)
dev[0].connect(ssid, psk="12345678", ieee80211w="1", key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2")
hwsim_utils.test_connectivity(dev[0].ifname, ap_ifname)
dev[1].connect(ssid, psk="12345678", ieee80211w="2", key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2")
hwsim_utils.test_connectivity(dev[1].ifname, ap_ifname)
wt.require_ap_pmf_optional(bssid)
wt.require_sta_pmf(bssid, dev[0].p2p_interface_addr())
wt.require_sta_pmf_mandatory(bssid, dev[1].p2p_interface_addr())
def test_ap_pmf_optional_2akm(dev):
"""WPA2-PSK AP with PMF optional (2 AKMs)"""
ssid = "test-pmf-optional-2akm"
wt = Wlantest()
wt.flush()
wt.add_passphrase("12345678")
params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
params["wpa_key_mgmt"] = "WPA-PSK WPA-PSK-SHA256";
params["ieee80211w"] = "1";
hostapd.add_ap(ap_ifname, params)
dev[0].connect(ssid, psk="12345678", ieee80211w="1", key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2")
hwsim_utils.test_connectivity(dev[0].ifname, ap_ifname)
dev[1].connect(ssid, psk="12345678", ieee80211w="2", key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2")
hwsim_utils.test_connectivity(dev[1].ifname, ap_ifname)
wt.require_ap_pmf_optional(bssid)
wt.require_sta_pmf(bssid, dev[0].p2p_interface_addr())
wt.require_sta_key_mgmt(bssid, dev[0].p2p_interface_addr(), "PSK-SHA256")
wt.require_sta_pmf_mandatory(bssid, dev[1].p2p_interface_addr())
wt.require_sta_key_mgmt(bssid, dev[1].p2p_interface_addr(), "PSK-SHA256")
def test_ap_pmf_negative(dev):
"""WPA2-PSK AP without PMF (negative test)"""
ssid = "test-pmf-negative"
wt = Wlantest()
wt.flush()
wt.add_passphrase("12345678")
params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
hostapd.add_ap(ap_ifname, params)
dev[0].connect(ssid, psk="12345678", ieee80211w="1", key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2")
hwsim_utils.test_connectivity(dev[0].ifname, ap_ifname)
try:
dev[1].connect(ssid, psk="12345678", ieee80211w="2", key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2")
hwsim_utils.test_connectivity(dev[1].ifname, ap_ifname)
raise Exception("PMF required STA connected to no PMF AP")
except Exception, e:
logger.debug("Ignore expected exception: " + str(e))
wt.require_ap_no_pmf(bssid)

View file

@ -7,7 +7,6 @@
# See README for more details. # See README for more details.
import time import time
import subprocess
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -15,6 +14,7 @@ import hwsim_utils
from hostapd import HostapdGlobal from hostapd import HostapdGlobal
from hostapd import Hostapd from hostapd import Hostapd
import hostapd import hostapd
from wlantest import Wlantest
ap_ifname = 'wlan2' ap_ifname = 'wlan2'
@ -53,34 +53,23 @@ def connect_2sta_open(dev):
dev[1].connect("test-open", key_mgmt="NONE") dev[1].connect("test-open", key_mgmt="NONE")
connectivity(dev, ap_ifname) connectivity(dev, ap_ifname)
def wlantest_tdls(field, bssid, addr1, addr2):
res = subprocess.check_output(["../../wlantest/wlantest_cli",
"get_tdls_counter", field, bssid, addr1,
addr2]);
if "FAIL" in res:
raise Exception("wlantest_cli command failed")
return int(res)
def wlantest_tdls_clear(bssid, addr1, addr2):
subprocess.call(["../../wlantest/wlantest_cli",
"clear_tdls_counters", bssid, addr1, addr2]);
def wlantest_setup(): def wlantest_setup():
subprocess.call(["../../wlantest/wlantest_cli", "flush"]); wt = Wlantest()
subprocess.call(["../../wlantest/wlantest_cli", "add_passphrase", wt.flush()
"12345678"]); wt.add_passphrase("12345678")
subprocess.call(["../../wlantest/wlantest_cli", "add_wepkey", wt.add_wepkey("68656c6c6f")
"68656c6c6f"]);
def wlantest_tdls_packet_counters(bssid, addr0, addr1): def wlantest_tdls_packet_counters(bssid, addr0, addr1):
dl = wlantest_tdls("valid_direct_link", bssid, addr0, addr1); wt = Wlantest()
inv_dl = wlantest_tdls("invalid_direct_link", bssid, addr0, addr1); dl = wt.get_tdls_counter("valid_direct_link", bssid, addr0, addr1)
ap = wlantest_tdls("valid_ap_path", bssid, addr0, addr1); inv_dl = wt.get_tdls_counter("invalid_direct_link", bssid, addr0, addr1)
inv_ap = wlantest_tdls("invalid_ap_path", bssid, addr0, addr1); ap = wt.get_tdls_counter("valid_ap_path", bssid, addr0, addr1)
inv_ap = wt.get_tdls_counter("invalid_ap_path", bssid, addr0, addr1)
return [dl,inv_dl,ap,inv_ap] return [dl,inv_dl,ap,inv_ap]
def tdls_check_dl(sta0, sta1, bssid, addr0, addr1): def tdls_check_dl(sta0, sta1, bssid, addr0, addr1):
wlantest_tdls_clear(bssid, addr0, addr1); wt = Wlantest()
wt.tdls_clear(bssid, addr0, addr1)
hwsim_utils.test_connectivity_sta(sta0, sta1) hwsim_utils.test_connectivity_sta(sta0, sta1)
[dl,inv_dl,ap,inv_ap] = wlantest_tdls_packet_counters(bssid, addr0, addr1) [dl,inv_dl,ap,inv_ap] = wlantest_tdls_packet_counters(bssid, addr0, addr1)
if dl == 0: if dl == 0:
@ -93,7 +82,8 @@ def tdls_check_dl(sta0, sta1, bssid, addr0, addr1):
raise Exception("Invalid frames through AP path") raise Exception("Invalid frames through AP path")
def tdls_check_ap(sta0, sta1, bssid, addr0, addr1): def tdls_check_ap(sta0, sta1, bssid, addr0, addr1):
wlantest_tdls_clear(bssid, addr0, addr1); wt = Wlantest()
wt.tdls_clear(bssid, addr0, addr1);
hwsim_utils.test_connectivity_sta(sta0, sta1) hwsim_utils.test_connectivity_sta(sta0, sta1)
[dl,inv_dl,ap,inv_ap] = wlantest_tdls_packet_counters(bssid, addr0, addr1) [dl,inv_dl,ap,inv_ap] = wlantest_tdls_packet_counters(bssid, addr0, addr1)
if dl > 0: if dl > 0:
@ -109,8 +99,9 @@ def setup_tdls(sta0, sta1, bssid, reverse=False, expect_fail=False):
logger.info("Setup TDLS") logger.info("Setup TDLS")
addr0 = sta0.p2p_interface_addr() addr0 = sta0.p2p_interface_addr()
addr1 = sta1.p2p_interface_addr() addr1 = sta1.p2p_interface_addr()
wlantest_tdls_clear(bssid, addr0, addr1); wt = Wlantest()
wlantest_tdls_clear(bssid, addr1, addr0); wt.tdls_clear(bssid, addr0, addr1);
wt.tdls_clear(bssid, addr1, addr0);
sta0.tdls_setup(addr1) sta0.tdls_setup(addr1)
time.sleep(1) time.sleep(1)
if expect_fail: if expect_fail:
@ -119,7 +110,7 @@ def setup_tdls(sta0, sta1, bssid, reverse=False, expect_fail=False):
if reverse: if reverse:
addr1 = sta0.p2p_interface_addr() addr1 = sta0.p2p_interface_addr()
addr0 = sta1.p2p_interface_addr() addr0 = sta1.p2p_interface_addr()
conf = wlantest_tdls("setup_conf_ok", bssid, addr0, addr1); conf = wt.get_tdls_counter("setup_conf_ok", bssid, addr0, addr1);
if conf == 0: if conf == 0:
raise Exception("No TDLS Setup Confirm (success) seen") raise Exception("No TDLS Setup Confirm (success) seen")
tdls_check_dl(sta0, sta1, bssid, addr0, addr1) tdls_check_dl(sta0, sta1, bssid, addr0, addr1)
@ -130,7 +121,8 @@ def teardown_tdls(sta0, sta1, bssid):
addr1 = sta1.p2p_interface_addr() addr1 = sta1.p2p_interface_addr()
sta0.tdls_teardown(addr1) sta0.tdls_teardown(addr1)
time.sleep(1) time.sleep(1)
teardown = wlantest_tdls("teardown", bssid, addr0, addr1); wt = Wlantest()
teardown = wt.get_tdls_counter("teardown", bssid, addr0, addr1);
if teardown == 0: if teardown == 0:
raise Exception("No TDLS Setup Teardown seen") raise Exception("No TDLS Setup Teardown seen")
tdls_check_ap(sta0, sta1, bssid, addr0, addr1) tdls_check_ap(sta0, sta1, bssid, addr0, addr1)

98
tests/hwsim/wlantest.py Normal file
View file

@ -0,0 +1,98 @@
#!/usr/bin/python
#
# Python class for controlling wlantest
# Copyright (c) 2013, Jouni Malinen <j@w1.fi>
#
# This software may be distributed under the terms of the BSD license.
# See README for more details.
import os
import time
import subprocess
import logging
import wpaspy
logger = logging.getLogger(__name__)
wlantest_cli = '../../wlantest/wlantest_cli'
class Wlantest:
def flush(self):
res = subprocess.check_output([wlantest_cli, "flush"])
if "FAIL" in res:
raise Exception("wlantest_cli flush failed")
def add_passphrase(self, passphrase):
res = subprocess.check_output([wlantest_cli, "add_passphrase",
passphrase])
if "FAIL" in res:
raise Exception("wlantest_cli add_passphrase failed")
def add_wepkey(self, key):
res = subprocess.check_output([wlantest_cli, "add_wepkey", key])
if "FAIL" in res:
raise Exception("wlantest_cli add_key failed")
def info_bss(self, field, bssid):
res = subprocess.check_output([wlantest_cli, "info_bss", field, bssid])
if "FAIL" in res:
raise Exception("Could not get BSS info from wlantest for " + bssid)
return res
def info_sta(self, field, bssid, addr):
res = subprocess.check_output([wlantest_cli, "info_sta", field, bssid,
addr])
if "FAIL" in res:
raise Exception("Could not get STA info from wlantest for " + addr)
return res
def tdls_clear(self, bssid, addr1, addr2):
subprocess.call([wlantest_cli, "clear_tdls_counters", bssid, addr1,
addr2]);
def get_tdls_counter(self, field, bssid, addr1, addr2):
res = subprocess.check_output([wlantest_cli, "get_tdls_counter", field,
bssid, addr1, addr2]);
if "FAIL" in res:
raise Exception("wlantest_cli command failed")
return int(res)
def require_ap_pmf_mandatory(self, bssid):
res = self.info_bss("rsn_capab", bssid)
if "MFPR" not in res:
raise Exception("AP did not require PMF")
if "MFPC" not in res:
raise Exception("AP did not enable PMF")
res = self.info_bss("key_mgmt", bssid)
if "PSK-SHA256" not in res:
raise Exception("AP did not enable SHA256-based AKM for PMF")
def require_ap_pmf_optional(self, bssid):
res = self.info_bss("rsn_capab", bssid)
if "MFPR" in res:
raise Exception("AP required PMF")
if "MFPC" not in res:
raise Exception("AP did not enable PMF")
def require_ap_no_pmf(self, bssid):
res = self.info_bss("rsn_capab", bssid)
if "MFPR" in res:
raise Exception("AP required PMF")
if "MFPC" in res:
raise Exception("AP enabled PMF")
def require_sta_pmf_mandatory(self, bssid, addr):
res = self.info_sta("rsn_capab", bssid, addr)
if "MFPR" not in res:
raise Exception("STA did not require PMF")
if "MFPC" not in res:
raise Exception("STA did not enable PMF")
def require_sta_pmf(self, bssid, addr):
res = self.info_sta("rsn_capab", bssid, addr)
if "MFPC" not in res:
raise Exception("STA did not enable PMF")
def require_sta_key_mgmt(self, bssid, addr, key_mgmt):
res = self.info_sta("key_mgmt", bssid, addr)
if key_mgmt not in res:
raise Exception("Unexpected STA key_mgmt")

View file

@ -302,7 +302,7 @@ class WpaSupplicant:
raise Exception("Failed to request TDLS teardown") raise Exception("Failed to request TDLS teardown")
return None return None
def connect(self, ssid, psk=None, proto=None, key_mgmt=None, wep_key0=None): def connect(self, ssid, psk=None, proto=None, key_mgmt=None, wep_key0=None, ieee80211w=None):
logger.info("Connect STA " + self.ifname + " to AP") logger.info("Connect STA " + self.ifname + " to AP")
id = self.add_network() id = self.add_network()
self.set_network_quoted(id, "ssid", ssid) self.set_network_quoted(id, "ssid", ssid)
@ -312,6 +312,8 @@ class WpaSupplicant:
self.set_network(id, "proto", proto) self.set_network(id, "proto", proto)
if key_mgmt: if key_mgmt:
self.set_network(id, "key_mgmt", key_mgmt) self.set_network(id, "key_mgmt", key_mgmt)
if ieee80211w:
self.set_network(id, "ieee80211w", ieee80211w)
if wep_key0: if wep_key0:
self.set_network(id, "wep_key0", wep_key0) self.set_network(id, "wep_key0", wep_key0)
self.connect_network(id) self.connect_network(id)