tests: Verify behavior on incorrect GAS response type
Signed-hostap: Jouni Malinen <j@w1.fi>
This commit is contained in:
parent
2cace98ee1
commit
bfe375ec77
2 changed files with 79 additions and 4 deletions
|
@ -9,12 +9,17 @@
|
||||||
import os
|
import os
|
||||||
import time
|
import time
|
||||||
import logging
|
import logging
|
||||||
|
import binascii
|
||||||
|
import struct
|
||||||
import wpaspy
|
import wpaspy
|
||||||
|
|
||||||
logger = logging.getLogger()
|
logger = logging.getLogger()
|
||||||
hapd_ctrl = '/var/run/hostapd'
|
hapd_ctrl = '/var/run/hostapd'
|
||||||
hapd_global = '/var/run/hostapd-global'
|
hapd_global = '/var/run/hostapd-global'
|
||||||
|
|
||||||
|
def mac2tuple(mac):
|
||||||
|
return struct.unpack('6B', binascii.unhexlify(mac.replace(':','')))
|
||||||
|
|
||||||
class HostapdGlobal:
|
class HostapdGlobal:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.ctrl = wpaspy.Ctrl(hapd_global)
|
self.ctrl = wpaspy.Ctrl(hapd_global)
|
||||||
|
@ -144,6 +149,36 @@ class Hostapd:
|
||||||
return vals[field]
|
return vals[field]
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def mgmt_rx(self, timeout=5):
|
||||||
|
ev = self.wait_event(["MGMT-RX"], timeout=timeout)
|
||||||
|
if ev is None:
|
||||||
|
return None
|
||||||
|
msg = {}
|
||||||
|
frame = binascii.unhexlify(ev.split(' ')[1])
|
||||||
|
msg['frame'] = frame
|
||||||
|
|
||||||
|
hdr = struct.unpack('<HH6B6B6BH', frame[0:24])
|
||||||
|
msg['fc'] = hdr[0]
|
||||||
|
msg['subtype'] = (hdr[0] >> 4) & 0xf
|
||||||
|
hdr = hdr[1:]
|
||||||
|
msg['duration'] = hdr[0]
|
||||||
|
hdr = hdr[1:]
|
||||||
|
msg['da'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
|
||||||
|
hdr = hdr[6:]
|
||||||
|
msg['sa'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
|
||||||
|
hdr = hdr[6:]
|
||||||
|
msg['bssid'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
|
||||||
|
hdr = hdr[6:]
|
||||||
|
msg['seq_ctrl'] = hdr[0]
|
||||||
|
msg['payload'] = frame[24:]
|
||||||
|
|
||||||
|
return msg
|
||||||
|
|
||||||
|
def mgmt_tx(self, msg):
|
||||||
|
t = (msg['fc'], 0) + mac2tuple(msg['da']) + mac2tuple(msg['sa']) + mac2tuple(msg['bssid']) + (0,)
|
||||||
|
hdr = struct.pack('<HH6B6B6BH', *t)
|
||||||
|
self.request("MGMT_TX " + binascii.hexlify(hdr + msg['payload']))
|
||||||
|
|
||||||
def add_ap(ifname, params):
|
def add_ap(ifname, params):
|
||||||
logger.info("Starting AP " + ifname)
|
logger.info("Starting AP " + ifname)
|
||||||
hapd_global = HostapdGlobal()
|
hapd_global = HostapdGlobal()
|
||||||
|
|
|
@ -7,6 +7,7 @@
|
||||||
# See README for more details.
|
# See README for more details.
|
||||||
|
|
||||||
import time
|
import time
|
||||||
|
import binascii
|
||||||
import logging
|
import logging
|
||||||
logger = logging.getLogger()
|
logger = logging.getLogger()
|
||||||
import re
|
import re
|
||||||
|
@ -212,6 +213,13 @@ def test_gas_comeback_delay(dev, apdev):
|
||||||
if ev is None:
|
if ev is None:
|
||||||
raise Exception("Operation timed out")
|
raise Exception("Operation timed out")
|
||||||
|
|
||||||
|
def expect_gas_result(dev, result):
|
||||||
|
ev = dev.wait_event(["GAS-QUERY-DONE"], timeout=10)
|
||||||
|
if ev is None:
|
||||||
|
raise Exception("GAS query timed out")
|
||||||
|
if "result=" + result not in ev:
|
||||||
|
raise Exception("Unexpected GAS query result")
|
||||||
|
|
||||||
def test_gas_timeout(dev, apdev):
|
def test_gas_timeout(dev, apdev):
|
||||||
"""GAS timeout"""
|
"""GAS timeout"""
|
||||||
bssid = apdev[0]['bssid']
|
bssid = apdev[0]['bssid']
|
||||||
|
@ -232,8 +240,40 @@ def test_gas_timeout(dev, apdev):
|
||||||
if ev is None:
|
if ev is None:
|
||||||
raise Exception("MGMT RX wait timed out")
|
raise Exception("MGMT RX wait timed out")
|
||||||
|
|
||||||
ev = dev[0].wait_event(["GAS-QUERY-DONE"], timeout=10)
|
expect_gas_result(dev[0], "TIMEOUT")
|
||||||
|
|
||||||
|
def test_gas_invalid_response_type(dev, apdev):
|
||||||
|
"""GAS invalid response type"""
|
||||||
|
bssid = apdev[0]['bssid']
|
||||||
|
params = hs20_ap_params()
|
||||||
|
params['hessid'] = bssid
|
||||||
|
hostapd.add_ap(apdev[0]['ifname'], params)
|
||||||
|
hapd = hostapd.Hostapd(apdev[0]['ifname'])
|
||||||
|
|
||||||
|
dev[0].scan(freq="2412")
|
||||||
|
hapd.set("ext_mgmt_frame_handling", "1")
|
||||||
|
|
||||||
|
dev[0].request("ANQP_GET " + bssid + " 263")
|
||||||
|
ev = dev[0].wait_event(["GAS-QUERY-START"], timeout=5)
|
||||||
if ev is None:
|
if ev is None:
|
||||||
raise Exception("GAS query timed out")
|
raise Exception("GAS query start timed out")
|
||||||
if "result=TIMEOUT" not in ev:
|
|
||||||
raise Exception("Unexpected GAS query result")
|
query = hapd.mgmt_rx()
|
||||||
|
if query is None:
|
||||||
|
raise Exception("GAS query request not received")
|
||||||
|
resp = {}
|
||||||
|
resp['fc'] = query['fc']
|
||||||
|
resp['da'] = query['sa']
|
||||||
|
resp['sa'] = query['da']
|
||||||
|
resp['bssid'] = query['bssid']
|
||||||
|
# GAS Comeback Response instead of GAS Initial Response
|
||||||
|
resp['payload'] = binascii.unhexlify("040d" + binascii.hexlify(query['payload'][2]) + "0000000000" + "6c027f00" + "0000")
|
||||||
|
hapd.mgmt_tx(resp)
|
||||||
|
ev = hapd.wait_event(["MGMT-TX-STATUS"], timeout=5)
|
||||||
|
if ev is None:
|
||||||
|
raise Exception("Missing TX status for GAS response")
|
||||||
|
if "ok=1" not in ev:
|
||||||
|
raise Exception("GAS response not acknowledged")
|
||||||
|
|
||||||
|
# station drops the invalid frame, so this needs to result in GAS timeout
|
||||||
|
expect_gas_result(dev[0], "TIMEOUT")
|
||||||
|
|
Loading…
Reference in a new issue