tests: Add FST module tests

Signed-off-by: Jouni Malinen <jouni@qca.qualcomm.com>
This commit is contained in:
Anton Nayshtut 2014-08-05 18:25:59 +03:00 committed by Jouni Malinen
parent a8dab08a28
commit 41a256ecd9
10 changed files with 3060 additions and 2 deletions

View file

@ -71,6 +71,9 @@ CONFIG_SQLITE=y
CONFIG_SAE=y
CFLAGS += -DALL_DH_GROUPS
CONFIG_FST=y
CONFIG_FST_TEST=y
CONFIG_TESTING_OPTIONS=y
CONFIG_MODULE_TESTS=y

View file

@ -116,6 +116,9 @@ CFLAGS += -DALL_DH_GROUPS
CONFIG_WNM=y
CONFIG_FST=y
CONFIG_FST_TEST=y
CONFIG_TESTING_OPTIONS=y
CONFIG_MODULE_TESTS=y

View file

@ -0,0 +1,745 @@
# FST tests related classes
# Copyright (c) 2015, Qualcomm Atheros, Inc.
#
# This software may be distributed under the terms of the BSD license.
# See README for more details.
import logging
import subprocess
import os
import signal
import time
import re
import hostapd
import wpaspy
import utils
from wpasupplicant import WpaSupplicant
import fst_test_common
logger = logging.getLogger()
def parse_fst_iface_event(ev):
"""Parses FST iface event that comes as a string, e.g.
"<3>FST-EVENT-IFACE attached ifname=wlan9 group=fstg0"
Returns a dictionary with parsed "event_type", "ifname", and "group"; or
None if not an FST event or can't be parsed."""
event = {}
if ev.find("FST-EVENT-IFACE") == -1:
return None
if ev.find("attached") != -1:
event['event_type'] = 'attached'
elif ev.find("detached") != -1:
event['event_type'] = 'detached'
else:
return None
f = re.search("ifname=(\S+)", ev)
if f is not None:
event['ifname'] = f.group(1)
f = re.search("group=(\S+)", ev)
if f is not None:
event['group'] = f.group(1)
return event
def parse_fst_session_event(ev):
"""Parses FST session event that comes as a string, e.g.
"<3>FST-EVENT-SESSION event_type=EVENT_FST_SESSION_STATE session_id=0 reason=REASON_STT"
Returns a dictionary with parsed "type", "id", and "reason"; or None if not
a FST event or can't be parsed"""
event = {}
if ev.find("FST-EVENT-SESSION") == -1:
return None
event['new_state'] = '' # The field always exists in the dictionary
f = re.search("event_type=(\S+)", ev)
if f is None:
return None
event['type'] = f.group(1)
f = re.search("session_id=(\d+)", ev)
if f is not None:
event['id'] = f.group(1)
f = re.search("old_state=(\S+)", ev)
if f is not None:
event['old_state'] = f.group(1)
f = re.search("new_state=(\S+)", ev)
if f is not None:
event['new_state'] = f.group(1)
f = re.search("reason=(\S+)", ev)
if f is not None:
event['reason'] = f.group(1)
return event
def start_two_ap_sta_pairs(apdev):
"""auxiliary function that creates two pairs of APs and STAs"""
ap1 = FstAP(apdev[0]['ifname'], 'fst_11a', 'a',
fst_test_common.fst_test_def_chan_a,
fst_test_common.fst_test_def_group,
fst_test_common.fst_test_def_prio_low,
fst_test_common.fst_test_def_llt)
ap1.start()
ap2 = FstAP(apdev[1]['ifname'], 'fst_11g', 'g',
fst_test_common.fst_test_def_chan_g,
fst_test_common.fst_test_def_group,
fst_test_common.fst_test_def_prio_high,
fst_test_common.fst_test_def_llt)
ap2.start()
sta1 = FstSTA('wlan5',
fst_test_common.fst_test_def_group,
fst_test_common.fst_test_def_prio_low,
fst_test_common.fst_test_def_llt)
sta1.start()
sta2 = FstSTA('wlan6',
fst_test_common.fst_test_def_group,
fst_test_common.fst_test_def_prio_high,
fst_test_common.fst_test_def_llt)
sta2.start()
return ap1, ap2, sta1, sta2
def stop_two_ap_sta_pairs(ap1, ap2, sta1, sta2):
sta1.stop()
sta2.stop()
ap1.stop()
ap2.stop()
def connect_two_ap_sta_pairs(ap1, ap2, dev1, dev2):
"""Connects a pair of stations, each one to a separate AP"""
dev1.scan(freq=fst_test_common.fst_test_def_freq_a)
dev2.scan(freq=fst_test_common.fst_test_def_freq_g)
dev1.connect(ap1, key_mgmt="NONE",
scan_freq=fst_test_common.fst_test_def_freq_a)
dev2.connect(ap2, key_mgmt="NONE",
scan_freq=fst_test_common.fst_test_def_freq_g)
def disconnect_two_ap_sta_pairs(ap1, ap2, dev1, dev2):
dev1.disconnect()
dev2.disconnect()
def external_sta_connect(sta, ap, **kwargs):
"""Connects the external station to the given AP"""
if not isinstance(sta, WpaSupplicant):
raise Exception("Bad STA object")
if not isinstance(ap, FstAP):
raise Exception("Bad AP object to connect to")
hap = ap.get_instance()
sta.connect(ap.get_ssid(), **kwargs)
def disconnect_external_sta(sta, ap, check_disconnect=True):
"""Disconnects the external station from the AP"""
if not isinstance(sta, WpaSupplicant):
raise Exception("Bad STA object")
if not isinstance(ap, FstAP):
raise Exception("Bad AP object to connect to")
sta.request("DISCONNECT")
if check_disconnect:
hap = ap.get_instance()
ev = hap.wait_event([ "AP-STA-DISCONNECTED" ], timeout=10)
if ev is None:
raise Exception("No disconnection event received from %s" % ap.get_ssid())
#
# FstDevice class
# This is the parent class for the AP (FstAP) and STA (FstSTA) that implements
# FST functionality.
#
class FstDevice:
def __init__(self, iface, fst_group, fst_pri, fst_llt=None):
self.iface = iface
self.fst_group = fst_group
self.fst_pri = fst_pri
self.fst_llt = fst_llt # None llt means no llt parameter will be set
self.instance = None # Hostapd/WpaSupplicant instance
self.peer_obj = None # Peer object, must be a FstDevice child object
self.new_peer_addr = None # Peer MAC address for new session iface
self.old_peer_addr = None # Peer MAC address for old session iface
self.role = 'initiator' # Role: initiator/responder
s = self.grequest("FST-MANAGER TEST_REQUEST IS_SUPPORTED")
if not s.startswith('OK'):
raise utils.HwsimSkip("FST not supported")
def ifname(self):
return self.iface
def get_instance(self):
"""Gets the Hostapd/WpaSupplicant instance"""
raise Exception("Virtual get_instance() called!")
def get_own_mac_address(self):
"""Gets the device's own MAC address"""
raise Exception("Virtual get_own_mac_address() called!")
def get_new_peer_addr(self):
return self.new_peer_addr
def get_old_peer_addr(self):
return self.old_peer_addr
def get_actual_peer_addr(self):
"""Gets the peer address. A connected AP/station address is returned."""
raise Exception("Virtual get_actual_peer_addr() called!")
def grequest(self, req):
"""Send request on the global control interface"""
raise Exception, "Virtual grequest() called!"
def wait_gevent(self, events, timeout=None):
"""Wait for a list of events on the global interface"""
raise Exception("Virtual wait_gevent() called!")
def request(self, req):
"""Issue a request to the control interface"""
h = self.get_instance()
return h.request(req)
def wait_event(self, events, timeout=None):
"""Wait for an event from the control interface"""
h = self.get_instance()
if timeout is not None:
return h.wait_event(events, timeout=timeout)
else:
return h.wait_event(events)
def set_old_peer_addr(self, peer_addr=None):
"""Sets the peer address"""
if peer_addr is not None:
self.old_peer_addr = peer_addr
else:
self.old_peer_addr = self.get_actual_peer_addr()
def set_new_peer_addr(self, peer_addr=None):
"""Sets the peer address"""
if peer_addr is not None:
self.new_peer_addr = peer_addr
else:
self.new_peer_addr = self.get_actual_peer_addr()
def add_peer(self, obj, old_peer_addr=None, new_peer_addr=None):
"""Add peer for FST session(s). 'obj' is a FstDevice subclass object.
The method must be called before add_session().
If peer_addr is not specified, the address of the currently connected
station is used."""
if not isinstance(obj, FstDevice):
raise Exception("Peer must be a FstDevice object")
self.peer_obj = obj
self.set_old_peer_addr(old_peer_addr)
self.set_new_peer_addr(new_peer_addr)
def get_peer(self):
"""Returns peer object"""
return self.peer_obj
def set_fst_parameters(self, group_id=None, pri=None, llt=None):
"""Change/set new FST parameters. Can be used to start FST sessions with
different FST parameters than defined in the configuration file."""
if group_id is not None:
self.fst_group = group_id
if pri is not None:
self.fst_pri = pri
if llt is not None:
self.fst_llt = llt
def get_local_mbies(self, ifname=None):
if_name = ifname if ifname is not None else self.iface
return self.grequest("FST-MANAGER TEST_REQUEST GET_LOCAL_MBIES " + if_name)
def add_session(self):
"""Adds an FST session. add_peer() must be called calling this
function"""
if self.peer_obj is None:
raise Exception("Peer wasn't added before starting session")
grp = ' ' + self.fst_group if self.fst_group != '' else ''
sid = self.grequest("FST-MANAGER SESSION_ADD" + grp)
sid = sid.strip()
if sid.startswith("FAIL"):
raise Exception("Cannot add FST session with groupid ==" + grp)
return sid
def set_session_param(self, params):
request = "FST-MANAGER SESSION_SET"
if params is not None and params != '':
request = request + ' ' + params
return self.grequest(request)
def configure_session(self, sid, new_iface, old_iface = None):
"""Calls session_set for a number of parameters some of which are stored
in "self" while others are passed to this function explicitly. If
old_iface is None, current iface is used; if old_iface is an empty
string."""
oldiface = old_iface if old_iface is not None else self.iface
s = self.set_session_param(sid + ' old_ifname=' + oldiface)
if not s.startswith("OK"):
raise Exception("Cannot set FST session old_ifname: " + s)
if new_iface is not None:
s = self.set_session_param(sid + " new_ifname=" + new_iface)
if not s.startswith("OK"):
raise Exception("Cannot set FST session new_ifname:" + s)
if self.new_peer_addr is not None and self.new_peer_addr != '':
s = self.set_session_param(sid + " new_peer_addr=" + self.new_peer_addr)
if not s.startswith("OK"):
raise Exception("Cannot set FST session peer address:" + s + " (new)")
if self.old_peer_addr is not None and self.old_peer_addr != '':
s = self.set_session_param(sid + " old_peer_addr=" + self.old_peer_addr)
if not s.startswith("OK"):
raise Exception("Cannot set FST session peer address:" + s + " (old)")
if self.fst_llt is not None and self.fst_llt != '':
s = self.set_session_param(sid + " llt=" + self.fst_llt)
if not s.startswith("OK"):
raise Exception("Cannot set FST session llt:" + s)
def send_iface_attach_request(self, ifname, group, llt, priority):
request = "FST-ATTACH " + ifname + ' ' + group
if llt is not None:
request += " llt=" + llt
if priority is not None:
request += " priority=" + priority
res = self.grequest(request)
if not res.startswith("OK"):
raise Exception("Cannot attach FST iface: " + res)
def send_iface_detach_request(self, ifname):
res = self.grequest("FST-DETACH " + ifname)
if not res.startswith("OK"):
raise Exception("Cannot detach FST iface: " + res)
def send_session_setup_request(self, sid):
s = self.grequest("FST-MANAGER SESSION_INITIATE " + sid)
if not s.startswith('OK'):
raise Exception("Cannot send setup request: %s" % s)
return s
def send_session_setup_response(self, sid, response):
request = "FST-MANAGER SESSION_RESPOND " + sid + " " + response
s = self.grequest(request)
if not s.startswith('OK'):
raise Exception("Cannot send setup response: %s" % s)
return s
def send_test_session_setup_request(self, fsts_id,
additional_parameter = None):
request = "FST-MANAGER TEST_REQUEST SEND_SETUP_REQUEST " + fsts_id
if additional_parameter is not None:
request += " " + additional_parameter
s = self.grequest(request)
if not s.startswith('OK'):
raise Exception("Cannot send FST setup request: %s" % s)
return s
def send_test_session_setup_response(self, fsts_id,
response, additional_parameter = None):
request = "FST-MANAGER TEST_REQUEST SEND_SETUP_RESPONSE " + fsts_id + " " + response
if additional_parameter is not None:
request += " " + additional_parameter
s = self.grequest(request)
if not s.startswith('OK'):
raise Exception("Cannot send FST setup response: %s" % s)
return s
def send_test_ack_request(self, fsts_id):
s = self.grequest("FST-MANAGER TEST_REQUEST SEND_ACK_REQUEST " + fsts_id)
if not s.startswith('OK'):
raise Exception("Cannot send FST ack request: %s" % s)
return s
def send_test_ack_response(self, fsts_id):
s = self.grequest("FST-MANAGER TEST_REQUEST SEND_ACK_RESPONSE " + fsts_id)
if not s.startswith('OK'):
raise Exception("Cannot send FST ack response: %s" % s)
return s
def send_test_tear_down(self, fsts_id):
s = self.grequest("FST-MANAGER TEST_REQUEST SEND_TEAR_DOWN " + fsts_id)
if not s.startswith('OK'):
raise Exception("Cannot send FST tear down: %s" % s)
return s
def get_fsts_id_by_sid(self, sid):
s = self.grequest("FST-MANAGER TEST_REQUEST GET_FSTS_ID " + sid)
if s == ' ' or s.startswith('FAIL'):
raise Exception("Cannot get fsts_id for sid == " % sid)
return int(s)
def wait_for_iface_event(self, timeout):
while True:
ev = self.wait_gevent(["FST-EVENT-IFACE"], timeout)
if ev is None:
raise Exception("No FST-EVENT-IFACE received")
event = parse_fst_iface_event(ev)
if event is None:
# We can't parse so it's not our event, wait for next one
continue
return event
def wait_for_session_event(self, timeout, events_to_ignore=[],
events_to_count=[]):
while True:
ev = self.wait_gevent(["FST-EVENT-SESSION"], timeout)
if ev is None:
raise Exception("No FST-EVENT-SESSION received")
event = parse_fst_session_event(ev)
if event is None:
# We can't parse so it's not our event, wait for next one
continue
if len(events_to_ignore) > 0:
if event['type'] in events_to_ignore:
continue
elif len(events_to_count) > 0:
if not event['type'] in events_to_count:
continue
return event
def initiate_session(self, sid, response="accept"):
"""Initiates FST session with given session id 'sid'.
'response' is the session respond answer: "accept", "reject", or a
special "timeout" value to skip the response in order to test session
timeouts.
Returns: "OK" - session has been initiated, otherwise the reason for the
reset: REASON_REJECT, REASON_STT."""
strsid = ' ' + sid if sid != '' else ''
s = self.grequest("FST-MANAGER SESSION_INITIATE"+ strsid)
if not s.startswith('OK'):
raise Exception("Cannot initiate fst session: %s" % s)
ev = self.peer_obj.wait_gevent([ "FST-EVENT-SESSION" ], timeout=5)
if ev is None:
raise Exception("No FST-EVENT-SESSION received")
# We got FST event
event = parse_fst_session_event(ev)
if event == None:
raise Exception("Unrecognized FST event: " % ev)
if event['type'] != 'EVENT_FST_SETUP':
raise Exception("Expected FST_SETUP event, got: " + event['type'])
ev = self.peer_obj.wait_gevent(["FST-EVENT-SESSION"], timeout=5)
if ev is None:
raise Exception("No FST-EVENT-SESSION received")
event = parse_fst_session_event(ev)
if event == None:
raise Exception("Unrecognized FST event: " % ev)
if event['type'] != 'EVENT_FST_SESSION_STATE':
raise Exception("Expected EVENT_FST_SESSION_STATE event, got: " + event['type'])
if event['new_state'] != "SETUP_COMPLETION":
raise Exception("Expected new state SETUP_COMPLETION, got: " + event['new_state'])
if response == '':
return 'OK'
if response != "timeout":
s = self.peer_obj.grequest("FST-MANAGER SESSION_RESPOND "+ event['id'] + " " + response) # Or reject
if not s.startswith('OK'):
raise Exception("Error session_respond: %s" % s)
# Wait for EVENT_FST_SESSION_STATE events. We should get at least 2
# events. The 1st event will be EVENT_FST_SESSION_STATE
# old_state=INITIAL new_state=SETUP_COMPLETED. The 2nd event will be
# either EVENT_FST_ESTABLISHED with the session id or
# EVENT_FST_SESSION_STATE with new_state=INITIAL if the session was
# reset, the reason field will tell why.
result = ''
while result == '':
ev = self.wait_gevent(["FST-EVENT-SESSION"], timeout=5)
if ev is None:
break # No session event received
event = parse_fst_session_event(ev)
if event == None:
# We can't parse so it's not our event, wait for next one
continue
if event['type'] == 'EVENT_FST_ESTABLISHED':
result = "OK"
break
elif event['type'] == "EVENT_FST_SESSION_STATE":
if event['new_state'] == "INITIAL":
# Session was reset, the only reason to get back to initial
# state.
result = event['reason']
break
if result == '':
raise Exception("No event for session respond")
return result
def transfer_session(self, sid):
"""Transfers the session. 'sid' is the session id. 'hsta' is the
station-responder object.
Returns: REASON_SWITCH - the session has been transferred successfully
or a REASON_... reported by the reset event."""
request = "FST-MANAGER SESSION_TRANSFER"
if sid != '':
request += ' ' + sid
s = self.grequest(request)
if not s.startswith('OK'):
raise Exception("Cannot transfer fst session: %s" % s)
result = ''
while result == '':
ev = self.peer_obj.wait_gevent([ "FST-EVENT-SESSION" ], timeout=5)
if ev is None:
raise Exception("Missing session transfer event")
# We got FST event. We expect TRANSITION_CONFIRMED state and then
# INITIAL (reset) with the reason (e.g. "REASON_SWITCH").
# Right now we'll be waiting for the reset event and record the
# reason.
event = parse_fst_session_event(ev)
if event == None:
raise Exception("Unrecognized FST event: " % ev)
if event['new_state'] == 'INITIAL':
result = event['reason']
return result
def wait_for_tear_down(self):
ev = self.wait_gevent([ "FST-EVENT-SESSION" ], timeout=5)
if ev is None:
raise Exception("No FST-EVENT-SESSION received")
# We got FST event
event = parse_fst_session_event(ev)
if event == None:
raise Exception("Unrecognized FST event: " % ev)
if event['type'] != 'EVENT_FST_SESSION_STATE':
raise Exception("Expected EVENT_FST_SESSION_STATE event, got: " + event['type'])
if event['new_state'] != "INITIAL":
raise Exception("Expected new state INITIAL, got: " + event['new_state'])
if event['reason'] != 'REASON_TEARDOWN':
raise Exception("Expected reason REASON_TEARDOWN, got: " + event['reason'])
def teardown_session(self, sid):
"""Tears down FST session with a given session id ('sid')"""
strsid = ' ' + sid if sid != '' else ''
s = self.grequest("FST-MANAGER SESSION_TEARDOWN" + strsid)
if not s.startswith('OK'):
raise Exception("Cannot tear down fst session: %s" % s)
self.peer_obj.wait_for_tear_down()
def remove_session(self, sid, wait_for_tear_down=True):
"""Removes FST session with a given session id ('sid')"""
strsid = ' ' + sid if sid != '' else ''
s = self.grequest("FST-MANAGER SESSION_REMOVE" + strsid)
if not s.startswith('OK'):
raise Exception("Cannot remove fst session: %s" % s)
if wait_for_tear_down == True:
self.peer_obj.wait_for_tear_down()
def remove_all_sessions(self):
"""Removes FST session with a given session id ('sid')"""
grp = ' ' + self.fst_group if self.fst_group != '' else ''
s = self.grequest("FST-MANAGER LIST_SESSIONS" + grp)
if not s.startswith('FAIL'):
for sid in s.splitlines():
sid = sid.strip()
if len(sid) != 0:
self.remove_session(sid, wait_for_tear_down=False)
#
# FstAP class
#
class FstAP (FstDevice):
def __init__(self, iface, ssid, mode, chan, fst_group, fst_pri,
fst_llt=None):
"""If fst_group is empty, then FST parameters will not be set
If fst_llt is empty, the parameter will not be set and the default value
is expected to be configured."""
self.ssid = ssid
self.mode = mode
self.chan = chan
self.reg_ctrl = fst_test_common.HapdRegCtrl()
self.reg_ctrl.add_ap(iface, self.chan)
self.global_instance = hostapd.HostapdGlobal()
FstDevice.__init__(self, iface, fst_group, fst_pri, fst_llt)
def start(self):
"""Starts AP the "standard" way as it was intended by hostapd tests.
This will work only when FST supports fully dynamically loading
parameters in hostapd."""
params = {}
params['ssid'] = self.ssid
params['hw_mode'] = self.mode
params['channel'] = self.chan
params['country_code'] = 'US'
self.hapd=hostapd.add_ap(self.iface, params)
if not self.hapd.ping():
raise Exception("Could not ping FST hostapd")
self.reg_ctrl.start()
self.get_global_instance()
if len(self.fst_group) != 0:
self.send_iface_attach_request(self.iface, self.fst_group,
self.fst_llt, self.fst_pri)
return self.hapd
def stop(self):
"""Removes the AP, To be used when dynamic fst APs are implemented in
hostapd."""
if len(self.fst_group) != 0:
self.remove_all_sessions()
self.send_iface_detach_request(self.iface)
self.reg_ctrl.stop()
del self.global_instance
self.global_instance = None
def get_instance(self):
"""Return the Hostapd/WpaSupplicant instance"""
if self.instance is None:
self.instance = hostapd.Hostapd(self.iface)
return self.instance
def get_global_instance(self):
return self.global_instance
def get_own_mac_address(self):
"""Gets the device's own MAC address"""
h = self.get_instance()
status = h.get_status()
return status['bssid[0]']
def get_actual_peer_addr(self):
"""Gets the peer address. A connected station address is returned."""
# Use the device instance, the global control interface doesn't have
# station address
h = self.get_instance()
sta = h.get_sta(None)
if sta is None or 'addr' not in sta:
# Maybe station is not connected?
addr = None
else:
addr=sta['addr']
return addr
def grequest(self, req):
"""Send request on the global control interface"""
logger.debug("FstAP::grequest: " + req)
h = self.get_global_instance()
return h.request(req)
def wait_gevent(self, events, timeout=None):
"""Wait for a list of events on the global interface"""
h = self.get_global_instance()
if timeout is not None:
return h.wait_event(events, timeout=timeout)
else:
return h.wait_event(events)
def get_ssid(self):
return self.ssid
#
# FstSTA class
#
class FstSTA (FstDevice):
def __init__(self, iface, fst_group, fst_pri, fst_llt=None):
"""If fst_group is empty, then FST parameters will not be set
If fst_llt is empty, the parameter will not be set and the default value
is expected to be configured."""
FstDevice.__init__(self, iface, fst_group, fst_pri, fst_llt)
self.connected = None # FstAP object the station is connected to
def start(self):
"""Current implementation involves running another instance of
wpa_supplicant with fixed FST STAs configurations. When any type of
dynamic STA loading is implemented, rewrite the function similarly to
FstAP."""
h = self.get_instance()
h.interface_add(self.iface, drv_params="force_connect_cmd=1")
if not h.global_ping():
raise Exception("Could not ping FST wpa_supplicant")
if len(self.fst_group) != 0:
self.send_iface_attach_request(self.iface, self.fst_group,
self.fst_llt, self.fst_pri)
return None
def stop(self):
"""Removes the STA. In a static (temporary) implementation does nothing,
the STA will be removed when the fst wpa_supplicant process is killed by
fstap.cleanup()."""
h = self.get_instance()
if len(self.fst_group) != 0:
self.remove_all_sessions()
self.send_iface_detach_request(self.iface)
h.interface_remove(self.iface)
h.close_ctrl()
del h
self.instance = None
def get_instance(self):
"""Return the Hostapd/WpaSupplicant instance"""
if self.instance is None:
self.instance = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
return self.instance
def get_own_mac_address(self):
"""Gets the device's own MAC address"""
h = self.get_instance()
status = h.get_status()
return status['address']
def get_actual_peer_addr(self):
"""Gets the peer address. A connected station address is returned"""
h = self.get_instance()
status = h.get_status()
return status['bssid']
def grequest(self, req):
"""Send request on the global control interface"""
logger.debug("FstSTA::grequest: " + req)
h = self.get_instance()
return h.global_request(req)
def wait_gevent(self, events, timeout=None):
"""Wait for a list of events on the global interface"""
h = self.get_instance()
if timeout is not None:
return h.wait_global_event(events, timeout=timeout)
else:
return h.wait_global_event(events)
def scan(self, freq=None, no_wait=False, only_new=False):
"""Issue Scan with given parameters. Returns the BSS dictionary for the
AP found (the 1st BSS found. TODO: What if the AP required is not the
1st in list?) or None if no BSS found. None call be also a result of
no_wait=True. Note, request("SCAN_RESULTS") can be used to get all the
results at once."""
h = self.get_instance()
h.scan(None, freq, no_wait, only_new)
r = h.get_bss('0')
return r
def connect(self, ap, **kwargs):
"""Connects to the given AP"""
if not isinstance(ap, FstAP):
raise Exception("Bad AP object to connect to")
h = self.get_instance()
hap = ap.get_instance()
h.connect(ap.get_ssid(), **kwargs)
self.connected = ap
def connect_to_external_ap(self, ap, ssid, check_connection=True, **kwargs):
"""Connects to the given external AP"""
if not isinstance(ap, hostapd.Hostapd):
raise Exception("Bad AP object to connect to")
h = self.get_instance()
h.connect(ssid, **kwargs)
self.connected = ap
if check_connection:
ev = ap.wait_event([ "AP-STA-CONNECTED" ], timeout=10)
if ev is None:
self.connected = None
raise Exception("No connection event received from %s" % ssid)
def disconnect(self, check_disconnect=True):
"""Disconnects from the AP the station is currently connected to"""
if self.connected is not None:
h = self.get_instance()
h.request("DISCONNECT")
if check_disconnect:
hap = self.connected.get_instance()
ev = hap.wait_event([ "AP-STA-DISCONNECTED" ], timeout=10)
if ev is None:
raise Exception("No disconnection event received from %s" % self.connected.get_ssid())
self.connected = None
def disconnect_from_external_ap(self, check_disconnect=True):
"""Disconnects from the external AP the station is currently connected
to"""
if self.connected is not None:
h = self.get_instance()
h.request("DISCONNECT")
if check_disconnect:
hap = self.connected
ev = hap.wait_event([ "AP-STA-DISCONNECTED" ], timeout=10)
if ev is None:
raise Exception("No disconnection event received from AP")
self.connected = None

View file

@ -0,0 +1,88 @@
# FST tests related definitions
# Copyright (c) 2015, Qualcomm Atheros, Inc.
#
# This software may be distributed under the terms of the BSD license.
# See README for more details.
import subprocess
import logging
import hostapd
logger = logging.getLogger()
fst_test_def_group='fstg0'
fst_test_def_freq_g='2412' # Channel 1
fst_test_def_freq_a='5180' # Channel 36
fst_test_def_chan_g='1'
fst_test_def_chan_a='36'
fst_test_def_prio_low='100'
fst_test_def_prio_high='110'
fst_test_def_llt='100'
fst_test_def_reg_domain='00'
class HapdRegCtrl:
def __init__(self):
self.refcnt = 0
self.ifname = None
self.changed = False
def __del__(self):
if self.refcnt != 0 and self.changed == True:
self.restore_reg_domain()
def start(self):
if self.ifname != None:
hapd = hostapd.Hostapd(self.ifname)
self.changed = self.wait_hapd_reg_change(hapd)
def stop(self):
if self.changed == True:
self.restore_reg_domain()
self.changed = False
def add_ap(self, ifname, chan):
if self.changed == False and self.channel_may_require_reg_change(chan):
self.ifname = ifname
@staticmethod
def channel_may_require_reg_change(chan):
if int(chan) > 14:
return True
return False
@staticmethod
def wait_hapd_reg_change(hapd):
state = hapd.get_status_field("state")
if state != "COUNTRY_UPDATE":
state = hapd.get_status_field("state")
if state != "ENABLED":
raise Exception("Unexpected interface state - expected COUNTRY_UPDATE")
else:
logger.debug("fst hostapd: regulatory domain already set")
return True
logger.debug("fst hostapd: waiting for regulatory domain to be set...")
ev = hapd.wait_event(["AP-ENABLED"], timeout=10)
if not ev:
raise Exception("AP setup timed out")
logger.debug("fst hostapd: regulatory domain set")
state = hapd.get_status_field("state")
if state != "ENABLED":
raise Exception("Unexpected interface state - expected ENABLED")
logger.debug("fst hostapd: regulatory domain ready")
return True
@staticmethod
def restore_reg_domain():
logger.debug("fst hostapd: waiting for regulatory domain to be restored...")
res = subprocess.call(['iw', 'reg', 'set', fst_test_def_reg_domain])
if res != 0:
raise Exception("Cannot restore regulatory domain")
logger.debug("fst hostapd: regulatory domain ready")

View file

@ -21,6 +21,28 @@ def mac2tuple(mac):
class HostapdGlobal:
def __init__(self):
self.ctrl = wpaspy.Ctrl(hapd_global)
self.mon = wpaspy.Ctrl(hapd_global)
self.mon.attach()
def request(self, cmd):
return self.ctrl.request(cmd)
def wait_event(self, events, timeout):
start = os.times()[4]
while True:
while self.mon.pending():
ev = self.mon.recv()
logger.debug("(global): " + ev)
for event in events:
if event in ev:
return ev
now = os.times()[4]
remaining = start + timeout - now
if remaining <= 0:
break
if not self.mon.pending(timeout=remaining):
break
return None
def add(self, ifname):
res = self.ctrl.request("ADD " + ifname + " " + hapd_ctrl)

View file

@ -487,6 +487,10 @@ def main():
wt = Wlantest()
rename_log(args.logdir, 'hwsim0.pcapng', name, wt)
rename_log(args.logdir, 'hwsim0', name, wt)
if os.path.exists(os.path.join(args.logdir, 'fst-wpa_supplicant')):
rename_log(args.logdir, 'fst-wpa_supplicant', name, None)
if os.path.exists(os.path.join(args.logdir, 'fst-hostapd')):
rename_log(args.logdir, 'fst-hostapd', name, None)
end = datetime.now()
diff = end - start

View file

@ -100,7 +100,8 @@ else
NUM_CH=1
fi
test -f /proc/modules && sudo modprobe mac80211_hwsim radios=6 channels=$NUM_CH support_p2p_device=0
test -f /proc/modules && sudo modprobe mac80211_hwsim radios=7 channels=$NUM_CH support_p2p_device=0
sudo ifconfig hwsim0 up
sudo $WLANTEST -i hwsim0 -n $LOGDIR/hwsim0.pcapng -c -dt -L $LOGDIR/hwsim0 &
for i in 0 1 2; do

View file

@ -0,0 +1,528 @@
# FST configuration tests
# Copyright (c) 2015, Qualcomm Atheros, Inc.
#
# This software may be distributed under the terms of the BSD license.
# See README for more details.
import logging
logger = logging.getLogger()
import subprocess
import time
import os
import signal
import hostapd
import wpasupplicant
import utils
import fst_test_common
class FstLauncherConfig:
"""FstLauncherConfig class represents configuration to be used for
FST config tests related hostapd/wpa_supplicant instances"""
def __init__(self, iface, fst_group, fst_pri, fst_llt=None):
self.iface = iface
self.fst_group = fst_group
self.fst_pri = fst_pri
self.fst_llt = fst_llt # None llt means no llt parameter will be set
def ifname(self):
return self.iface
def is_ap(self):
"""Returns True if the configuration is for AP, otherwise - False"""
raise Exception("Virtual is_ap() called!")
def to_file(self, pathname):
"""Creates configuration file to be used by FST config tests related
hostapd/wpa_supplicant instances"""
raise Exception("Virtual to_file() called!")
class FstLauncherConfigAP(FstLauncherConfig):
"""FstLauncherConfigAP class represents configuration to be used for
FST config tests related hostapd instance"""
def __init__(self, iface, ssid, mode, chan, fst_group, fst_pri,
fst_llt=None):
self.ssid = ssid
self.mode = mode
self.chan = chan
FstLauncherConfig.__init__(self, iface, fst_group, fst_pri, fst_llt)
def is_ap(self):
return True
def get_channel(self):
return self.chan
def to_file(self, pathname):
"""Creates configuration file to be used by FST config tests related
hostapd instance"""
with open(pathname, "w") as f:
f.write("country_code=US\n"
"interface=%s\n"
"ctrl_interface=/var/run/hostapd\n"
"ssid=%s\n"
"channel=%s\n"
"hw_mode=%s\n"
"ieee80211n=1\n" % (self.iface, self.ssid, self.chan,
self.mode))
if len(self.fst_group) != 0:
f.write("fst_group_id=%s\n"
"fst_priority=%s\n" % (self.fst_group, self.fst_pri))
if self.fst_llt is not None:
f.write("fst_llt=%s\n" % self.fst_llt)
with open(pathname, "r") as f:
logger.debug("wrote hostapd config file %s:\n%s" % (pathname,
f.read()))
class FstLauncherConfigSTA(FstLauncherConfig):
"""FstLauncherConfig class represents configuration to be used for
FST config tests related wpa_supplicant instance"""
def __init__(self, iface, fst_group, fst_pri, fst_llt=None):
FstLauncherConfig.__init__(self, iface, fst_group, fst_pri, fst_llt)
def is_ap(self):
return False
def to_file(self, pathname):
"""Creates configuration file to be used by FST config tests related
wpa_supplicant instance"""
with open(pathname, "w") as f:
f.write("ctrl_interface=DIR=/var/run/wpa_supplicant\n"
"p2p_no_group_iface=1\n")
if len(self.fst_group) != 0:
f.write("fst_group_id=%s\n"
"fst_priority=%s\n" % (self.fst_group, self.fst_pri))
if self.fst_llt is not None:
f.write("fst_llt=%s\n" % self.fst_llt)
with open(pathname, "r") as f:
logger.debug("wrote wpa_supplicant config file %s:\n%s" % (pathname, f.read()))
class FstLauncher:
"""FstLauncher class is responsible for launching and cleaning up of FST
config tests related hostapd/wpa_supplicant instances"""
def __init__(self, logpath):
self.logger = logging.getLogger()
self.fst_logpath = logpath
self.cfgs_to_run = []
self.hapd_fst_global = '/var/run/hostapd-fst-global'
self.wsup_fst_global = '/tmp/fststa'
self.nof_aps = 0
self.nof_stas = 0
self.reg_ctrl = fst_test_common.HapdRegCtrl()
self.test_is_supported()
def __del__(self):
self.cleanup()
@staticmethod
def test_is_supported():
h = hostapd.HostapdGlobal()
resp = h.request("FST-MANAGER TEST_REQUEST IS_SUPPORTED")
if not resp.startswith("OK"):
raise utils.HwsimSkip("FST not supported")
w = wpasupplicant.WpaSupplicant(global_iface='/tmp/wpas-wlan5')
resp = w.global_request("FST-MANAGER TEST_REQUEST IS_SUPPORTED")
if not resp.startswith("OK"):
raise utils.HwsimSkip("FST not supported")
def get_cfg_pathname(self, cfg):
"""Returns pathname of ifname based configuration file"""
return self.fst_logpath +'/'+ cfg.ifname() + '.conf'
def add_cfg(self, cfg):
"""Adds configuration to be used for launching hostapd/wpa_supplicant
instances"""
if cfg not in self.cfgs_to_run:
self.cfgs_to_run.append(cfg)
if cfg.is_ap() == True:
self.nof_aps += 1
else:
self.nof_stas += 1
def remove_cfg(self, cfg):
"""Removes configuration previously added with add_cfg"""
if cfg in self.cfgs_to_run:
self.cfgs_to_run.remove(cfg)
if cfg.is_ap() == True:
self.nof_aps -= 1
else:
self.nof_stas -= 1
config_file = self.get_cfg_pathname(cfg);
if os.path.exists(config_file):
os.remove(config_file)
def run_hostapd(self):
"""Lauches hostapd with interfaces configured according to
FstLauncherConfigAP configurations added"""
if self.nof_aps == 0:
raise Exception("No FST APs to start")
pidfile = self.fst_logpath + '/' + 'myhostapd.pid'
mylogfile = self.fst_logpath + '/' + 'fst-hostapd'
cmd = [ '../../hostapd/hostapd', '-B', '-ddd',
'-P', pidfile, '-f', mylogfile, '-g', self.hapd_fst_global]
for i in range(0, len(self.cfgs_to_run)):
cfg = self.cfgs_to_run[i]
if cfg.is_ap() == True:
cfgfile = self.get_cfg_pathname(cfg)
cfg.to_file(cfgfile)
cmd.append(cfgfile)
self.reg_ctrl.add_ap(cfg.ifname(), cfg.get_channel())
self.logger.debug("Starting fst hostapd: " + ' '.join(cmd))
res = subprocess.call(cmd)
self.logger.debug("fst hostapd start result: %d" % res)
if res == 0:
self.reg_ctrl.start()
return res
def run_wpa_supplicant(self):
"""Lauches wpa_supplicant with interfaces configured according to
FstLauncherConfigSTA configurations added"""
if self.nof_stas == 0:
raise Exception("No FST STAs to start")
pidfile = self.fst_logpath + '/' + 'mywpa_supplicant.pid'
mylogfile = self.fst_logpath + '/' + 'fst-wpa_supplicant'
cmd = [ '../../wpa_supplicant/wpa_supplicant', '-B', '-ddd',
'-P' + pidfile, '-f', mylogfile, '-g', self.wsup_fst_global ]
sta_no = 0
for i in range(0, len(self.cfgs_to_run)):
cfg = self.cfgs_to_run[i]
if cfg.is_ap() == False:
cfgfile = self.get_cfg_pathname(cfg)
cfg.to_file(cfgfile)
cmd.append('-c' + cfgfile)
cmd.append('-i' + cfg.ifname())
cmd.append('-Dnl80211')
if sta_no != self.nof_stas -1:
cmd.append('-N') # Next station configuration
sta_no += 1
self.logger.debug("Starting fst supplicant: " + ' '.join(cmd))
res = subprocess.call(cmd)
self.logger.debug("fst supplicant start result: %d" % res)
return res
def cleanup(self):
"""Terminates hostapd/wpa_supplicant processes previously launched with
run_hostapd/run_wpa_supplicant"""
pidfile = self.fst_logpath + '/' + 'myhostapd.pid'
self.kill_pid(pidfile)
pidfile = self.fst_logpath + '/' + 'mywpa_supplicant.pid'
self.kill_pid(pidfile)
self.reg_ctrl.stop()
while len(self.cfgs_to_run) != 0:
cfg = self.cfgs_to_run[0]
self.remove_cfg(cfg)
def kill_pid(self, pidfile):
"""Kills process by PID file"""
if not os.path.exists(pidfile):
return
pid = -1
try:
pf = file(pidfile, 'r')
pid = int(pf.read().strip())
pf.close()
self.logger.debug("kill_pid %s --> pid %d" % (pidfile, pid))
os.kill(pid, signal.SIGTERM)
for i in range(10):
try:
# Poll the pid (Is the process still existing?)
os.kill(pid, 0)
except OSError:
# No, already done
break
# Wait and check again
time.sleep(1)
except Exception, e:
self.logger.debug("Didn't stop the pid=%d. Was it stopped already? (%s)" % (pid, str(e)))
def parse_ies(iehex, el=-1):
"""Parses the information elements hex string 'iehex' in format
"0a0b0c0d0e0f". If no 'el' defined just checks the IE string for integrity.
If 'el' is defined returns the list of hex values of the specific IE (or
empty list if the element is not in the string."""
iel = [iehex[i:i + 2] for i in range(0, len(iehex), 2)]
for i in range(0, len(iel)):
iel[i] = int(iel[i], 16)
# Sanity check
i = 0
res = []
while i < len(iel):
logger.debug("IE found: %x" % iel[i])
if el != -1 and el == iel[i]:
res = iel[i + 2:i + 2 + iel[i + 1]]
i += 2 + iel[i + 1]
if i != len(iel):
logger.error("Bad IE string: " + iehex)
res = []
return res
def scan_and_get_bss(dev, frq):
"""Issues a scan on given device on given frequency, returns the bss info
dictionary ('ssid','ie','flags', etc.) or None. Note, the function
implies there is only one AP on the given channel. If not a case,
the function must be changed to call dev.get_bss() till the AP with the
[b]ssid that we need is found"""
dev.scan(freq=frq)
return dev.get_bss('0')
# AP configuration tests
def run_test_ap_configuration(apdev, test_params,
fst_group = fst_test_common.fst_test_def_group,
fst_pri = fst_test_common.fst_test_def_prio_high,
fst_llt = fst_test_common.fst_test_def_llt):
"""Runs FST hostapd where the 1st AP configuration is fixed, the 2nd fst
configuration is provided by the parameters. Returns the result of the run:
0 - no errors discovered, an error otherwise. The function is used for
simplek "bad configuration" tests."""
logdir = test_params['logdir']
fst_launcher = FstLauncher(logdir)
ap1 = FstLauncherConfigAP(apdev[0]['ifname'], 'fst_goodconf', 'a',
fst_test_common.fst_test_def_chan_a,
fst_test_common.fst_test_def_group,
fst_test_common.fst_test_def_prio_low,
fst_test_common.fst_test_def_llt)
ap2 = FstLauncherConfigAP(apdev[1]['ifname'], 'fst_badconf', 'b',
fst_test_common.fst_test_def_chan_g, fst_group,
fst_pri, fst_llt)
fst_launcher.add_cfg(ap1)
fst_launcher.add_cfg(ap2)
res = fst_launcher.run_hostapd()
return res
def run_test_sta_configuration(test_params,
fst_group = fst_test_common.fst_test_def_group,
fst_pri = fst_test_common.fst_test_def_prio_high,
fst_llt = fst_test_common.fst_test_def_llt):
"""Runs FST wpa_supplicant where the 1st STA configuration is fixed, the
2nd fst configuration is provided by the parameters. Returns the result of
the run: 0 - no errors discovered, an error otherwise. The function is used
for simple "bad configuration" tests."""
logdir = test_params['logdir']
fst_launcher = FstLauncher(logdir)
sta1 = FstLauncherConfigSTA('wlan5',
fst_test_common.fst_test_def_group,
fst_test_common.fst_test_def_prio_low,
fst_test_common.fst_test_def_llt)
sta2 = FstLauncherConfigSTA('wlan6', fst_group, fst_pri, fst_llt)
fst_launcher.add_cfg(sta1)
fst_launcher.add_cfg(sta2)
res = fst_launcher.run_wpa_supplicant()
return res
def test_fst_ap_config_llt_neg(dev, apdev, test_params):
"""FST AP configuration negative LLT"""
res = run_test_ap_configuration(apdev, test_params, fst_llt = '-1')
if res == 0:
raise Exception("hostapd started with a negative llt")
def test_fst_ap_config_llt_zero(dev, apdev, test_params):
"""FST AP configuration zero LLT"""
res = run_test_ap_configuration(apdev, test_params, fst_llt = '0')
if res == 0:
raise Exception("hostapd started with a zero llt")
def test_fst_ap_config_llt_too_big(dev, apdev, test_params):
"""FST AP configuration LLT is too big"""
res = run_test_ap_configuration(apdev, test_params,
fst_llt = '4294967296') #0x100000000
if res == 0:
raise Exception("hostapd started with llt that is too big")
def test_fst_ap_config_llt_nan(dev, apdev, test_params):
"""FST AP configuration LLT is not a number"""
res = run_test_ap_configuration(apdev, test_params, fst_llt = 'nan')
if res == 0:
raise Exception("hostapd started with llt not a number")
def test_fst_ap_config_pri_neg(dev, apdev, test_params):
"""FST AP configuration Priority negative"""
res = run_test_ap_configuration(apdev, test_params, fst_pri = '-1')
if res == 0:
raise Exception("hostapd started with a negative fst priority")
def test_fst_ap_config_pri_zero(dev, apdev, test_params):
"""FST AP configuration Priority zero"""
res = run_test_ap_configuration(apdev, test_params, fst_pri = '0')
if res == 0:
raise Exception("hostapd started with a zero fst priority")
def test_fst_ap_config_pri_large(dev, apdev, test_params):
"""FST AP configuration Priority too large"""
res = run_test_ap_configuration(apdev, test_params, fst_pri = '256')
if res == 0:
raise Exception("hostapd started with too large fst priority")
def test_fst_ap_config_pri_nan(dev, apdev, test_params):
"""FST AP configuration Priority not a number"""
res = run_test_ap_configuration(apdev, test_params, fst_pri = 'nan')
if res == 0:
raise Exception("hostapd started with fst priority not a number")
def test_fst_ap_config_group_len(dev, apdev, test_params):
"""FST AP configuration Group max length"""
res = run_test_ap_configuration(apdev, test_params,
fst_group = 'fstg5678abcd34567')
if res == 0:
raise Exception("hostapd started with fst_group length too big")
def test_fst_ap_config_good(dev, apdev, test_params):
"""FST AP configuration good parameters"""
res = run_test_ap_configuration(apdev, test_params)
if res != 0:
raise Exception("hostapd didn't start with valid config parameters")
def test_fst_ap_config_default(dev, apdev, test_params):
"""FST AP configuration default parameters"""
res = run_test_ap_configuration(apdev, test_params, fst_llt = None)
if res != 0:
raise Exception("hostapd didn't start with valid config parameters")
# STA configuration tests
def test_fst_sta_config_llt_neg(dev, apdev, test_params):
"""FST STA configuration negative LLT"""
res = run_test_sta_configuration(test_params, fst_llt = '-1')
if res == 0:
raise Exception("wpa_supplicant started with a negative llt")
def test_fst_sta_config_llt_zero(dev, apdev, test_params):
"""FST STA configuration zero LLT"""
res = run_test_sta_configuration(test_params, fst_llt = '0')
if res == 0:
raise Exception("wpa_supplicant started with a zero llt")
def test_fst_sta_config_llt_large(dev, apdev, test_params):
"""FST STA configuration LLT is too large"""
res = run_test_sta_configuration(test_params,
fst_llt = '4294967296') #0x100000000
if res == 0:
raise Exception("wpa_supplicant started with llt that is too large")
def test_fst_sta_config_llt_nan(dev, apdev, test_params):
"""FST STA configuration LLT is not a number"""
res = run_test_sta_configuration(test_params, fst_llt = 'nan')
if res == 0:
raise Exception("wpa_supplicant started with llt not a number")
def test_fst_sta_config_pri_neg(dev, apdev, test_params):
"""FST STA configuration Priority negative"""
res = run_test_sta_configuration(test_params, fst_pri = '-1')
if res == 0:
raise Exception("wpa_supplicant started with a negative fst priority")
def test_fst_sta_config_pri_zero(dev, apdev, test_params):
"""FST STA configuration Priority zero"""
res = run_test_sta_configuration(test_params, fst_pri = '0')
if res == 0:
raise Exception("wpa_supplicant started with a zero fst priority")
def test_fst_sta_config_pri_big(dev, apdev, test_params):
"""FST STA configuration Priority too large"""
res = run_test_sta_configuration(test_params, fst_pri = '256')
if res == 0:
raise Exception("wpa_supplicant started with too large fst priority")
def test_fst_sta_config_pri_nan(dev, apdev, test_params):
"""FST STA configuration Priority not a number"""
res = run_test_sta_configuration(test_params, fst_pri = 'nan')
if res == 0:
raise Exception("wpa_supplicant started with fst priority not a number")
def test_fst_sta_config_group_len(dev, apdev, test_params):
"""FST STA configuration Group max length"""
res = run_test_sta_configuration(test_params,
fst_group = 'fstg5678abcd34567')
if res == 0:
raise Exception("wpa_supplicant started with fst_group length too big")
def test_fst_sta_config_good(dev, apdev, test_params):
"""FST STA configuration good parameters"""
res = run_test_sta_configuration(test_params)
if res != 0:
raise Exception("wpa_supplicant didn't start with valid config parameters")
def test_fst_sta_config_default(dev, apdev, test_params):
"""FST STA configuration default parameters"""
res = run_test_sta_configuration(test_params, fst_llt = None)
if res != 0:
raise Exception("wpa_supplicant didn't start with valid config parameters")
def test_fst_scan_mb(dev, apdev, test_params):
"""FST scan valid MB IE presence with normal start"""
logdir = test_params['logdir']
# Test valid MB IE in scan results
fst_launcher = FstLauncher(logdir)
ap1 = FstLauncherConfigAP(apdev[0]['ifname'], 'fst_11a', 'a',
fst_test_common.fst_test_def_chan_a,
fst_test_common.fst_test_def_group,
fst_test_common.fst_test_def_prio_high)
ap2 = FstLauncherConfigAP(apdev[1]['ifname'], 'fst_11g', 'b',
fst_test_common.fst_test_def_chan_g,
fst_test_common.fst_test_def_group,
fst_test_common.fst_test_def_prio_low)
fst_launcher.add_cfg(ap1)
fst_launcher.add_cfg(ap2)
res = fst_launcher.run_hostapd()
if res != 0:
raise Exception("hostapd didn't start properly")
try:
mbie1=[]
flags1 = ''
mbie2=[]
flags2 = ''
# Scan 1st AP
vals1 = scan_and_get_bss(dev[0], fst_test_common.fst_test_def_freq_a)
if vals1 != None:
if 'ie' in vals1:
mbie1 = parse_ies(vals1['ie'], 0x9e)
if 'flags' in vals1:
flags1 = vals1['flags']
# Scan 2nd AP
vals2 = scan_and_get_bss(dev[2], fst_test_common.fst_test_def_freq_g)
if vals2 != None:
if 'ie' in vals2:
mbie2 = parse_ies(vals2['ie'],0x9e)
if 'flags' in vals2:
flags2 = vals2['flags']
finally:
fst_launcher.cleanup()
if len(mbie1) == 0:
raise Exception("No MB IE created by 1st AP")
if len(mbie2) == 0:
raise Exception("No MB IE created by 2nd AP")
def test_fst_scan_nomb(dev, apdev, test_params):
"""FST scan no MB IE presence with 1 AP start"""
logdir = test_params['logdir']
# Test valid MB IE in scan results
fst_launcher = FstLauncher(logdir)
ap1 = FstLauncherConfigAP(apdev[0]['ifname'], 'fst_11a', 'a',
fst_test_common.fst_test_def_chan_a,
fst_test_common.fst_test_def_group,
fst_test_common.fst_test_def_prio_high)
fst_launcher.add_cfg(ap1)
res = fst_launcher.run_hostapd()
if res != 0:
raise Exception("Hostapd didn't start properly")
try:
time.sleep(2)
mbie1=[]
flags1 = ''
vals1 = scan_and_get_bss(dev[0], fst_test_common.fst_test_def_freq_a)
if vals1 != None:
if 'ie' in vals1:
mbie1 = parse_ies(vals1['ie'], 0x9e)
if 'flags' in vals1:
flags1 = vals1['flags']
finally:
fst_launcher.cleanup()
if len(mbie1) != 0:
raise Exception("MB IE exists with 1 AP")

File diff suppressed because it is too large Load diff

View file

@ -109,7 +109,7 @@ kvm \
-fsdev local,security_model=none,id=fsdev-logs,path="$LOGDIR",writeout=immediate \
-device virtio-9p-pci,id=fs-logs,fsdev=fsdev-logs,mount_tag=logshare \
-monitor null -serial stdio -serial file:$LOGDIR/console \
-append "mac80211_hwsim.support_p2p_device=0 mac80211_hwsim.channels=$CHANNELS mac80211_hwsim.radios=6 init=$CMD testdir=$TESTDIR timewarp=$TIMEWARP console=$KVMOUT root=/dev/root rootflags=trans=virtio,version=9p2000.u ro rootfstype=9p EPATH=$EPATH ARGS=$RUN_TEST_ARGS"
-append "mac80211_hwsim.support_p2p_device=0 mac80211_hwsim.channels=$CHANNELS mac80211_hwsim.radios=7 init=$CMD testdir=$TESTDIR timewarp=$TIMEWARP console=$KVMOUT root=/dev/root rootflags=trans=virtio,version=9p2000.u ro rootfstype=9p EPATH=$EPATH ARGS=$RUN_TEST_ARGS"
if [ $CODECOV = "yes" ]; then
echo "Preparing code coverage reports"