54cf411f4c
This adds hwsim test ap_vlan_iface_cleanup_multibss. It connects two stations in different BSS but the same hostapd process. First both stations are in VLAN 1, then they get reauthenticated into VLAN 2. Due to the ordering of the stations moving around, this test checks that bridge and tagged interface referencing counting is done globally, such that the tagged interface is not removed too early and no bridge is left over. Signed-off-by: Michael Braun <michael-dev@fami-braun.de>
362 lines
11 KiB
Python
362 lines
11 KiB
Python
# Python class for controlling hostapd
|
|
# Copyright (c) 2013-2014, Jouni Malinen <j@w1.fi>
|
|
#
|
|
# This software may be distributed under the terms of the BSD license.
|
|
# See README for more details.
|
|
|
|
import os
|
|
import time
|
|
import logging
|
|
import binascii
|
|
import struct
|
|
import wpaspy
|
|
|
|
logger = logging.getLogger()
|
|
hapd_ctrl = '/var/run/hostapd'
|
|
hapd_global = '/var/run/hostapd-global'
|
|
|
|
def mac2tuple(mac):
|
|
return struct.unpack('6B', binascii.unhexlify(mac.replace(':','')))
|
|
|
|
class HostapdGlobal:
|
|
def __init__(self):
|
|
self.ctrl = wpaspy.Ctrl(hapd_global)
|
|
|
|
def add(self, ifname):
|
|
res = self.ctrl.request("ADD " + ifname + " " + hapd_ctrl)
|
|
if not "OK" in res:
|
|
raise Exception("Could not add hostapd interface " + ifname)
|
|
|
|
def add_iface(self, ifname, confname):
|
|
res = self.ctrl.request("ADD " + ifname + " config=" + confname)
|
|
if not "OK" in res:
|
|
raise Exception("Could not add hostapd interface")
|
|
|
|
def add_bss(self, phy, confname, ignore_error=False):
|
|
res = self.ctrl.request("ADD bss_config=" + phy + ":" + confname)
|
|
if not "OK" in res:
|
|
if not ignore_error:
|
|
raise Exception("Could not add hostapd BSS")
|
|
|
|
def remove(self, ifname):
|
|
self.ctrl.request("REMOVE " + ifname, timeout=30)
|
|
|
|
def relog(self):
|
|
self.ctrl.request("RELOG")
|
|
|
|
def flush(self):
|
|
self.ctrl.request("FLUSH")
|
|
|
|
|
|
class Hostapd:
|
|
def __init__(self, ifname, bssidx=0):
|
|
self.ifname = ifname
|
|
self.ctrl = wpaspy.Ctrl(os.path.join(hapd_ctrl, ifname))
|
|
self.mon = wpaspy.Ctrl(os.path.join(hapd_ctrl, ifname))
|
|
self.mon.attach()
|
|
self.bssid = None
|
|
self.bssidx = bssidx
|
|
|
|
def own_addr(self):
|
|
if self.bssid is None:
|
|
self.bssid = self.get_status_field('bssid[%d]' % self.bssidx)
|
|
return self.bssid
|
|
|
|
def request(self, cmd):
|
|
logger.debug(self.ifname + ": CTRL: " + cmd)
|
|
return self.ctrl.request(cmd)
|
|
|
|
def ping(self):
|
|
return "PONG" in self.request("PING")
|
|
|
|
def set(self, field, value):
|
|
if not "OK" in self.request("SET " + field + " " + value):
|
|
raise Exception("Failed to set hostapd parameter " + field)
|
|
|
|
def set_defaults(self):
|
|
self.set("driver", "nl80211")
|
|
self.set("hw_mode", "g")
|
|
self.set("channel", "1")
|
|
self.set("ieee80211n", "1")
|
|
self.set("logger_stdout", "-1")
|
|
self.set("logger_stdout_level", "0")
|
|
|
|
def set_open(self, ssid):
|
|
self.set_defaults()
|
|
self.set("ssid", ssid)
|
|
|
|
def set_wpa2_psk(self, ssid, passphrase):
|
|
self.set_defaults()
|
|
self.set("ssid", ssid)
|
|
self.set("wpa_passphrase", passphrase)
|
|
self.set("wpa", "2")
|
|
self.set("wpa_key_mgmt", "WPA-PSK")
|
|
self.set("rsn_pairwise", "CCMP")
|
|
|
|
def set_wpa_psk(self, ssid, passphrase):
|
|
self.set_defaults()
|
|
self.set("ssid", ssid)
|
|
self.set("wpa_passphrase", passphrase)
|
|
self.set("wpa", "1")
|
|
self.set("wpa_key_mgmt", "WPA-PSK")
|
|
self.set("wpa_pairwise", "TKIP")
|
|
|
|
def set_wpa_psk_mixed(self, ssid, passphrase):
|
|
self.set_defaults()
|
|
self.set("ssid", ssid)
|
|
self.set("wpa_passphrase", passphrase)
|
|
self.set("wpa", "3")
|
|
self.set("wpa_key_mgmt", "WPA-PSK")
|
|
self.set("wpa_pairwise", "TKIP")
|
|
self.set("rsn_pairwise", "CCMP")
|
|
|
|
def set_wep(self, ssid, key):
|
|
self.set_defaults()
|
|
self.set("ssid", ssid)
|
|
self.set("wep_key0", key)
|
|
|
|
def enable(self):
|
|
if not "OK" in self.request("ENABLE"):
|
|
raise Exception("Failed to enable hostapd interface " + self.ifname)
|
|
|
|
def disable(self):
|
|
if not "OK" in self.request("DISABLE"):
|
|
raise Exception("Failed to disable hostapd interface " + self.ifname)
|
|
|
|
def dump_monitor(self):
|
|
while self.mon.pending():
|
|
ev = self.mon.recv()
|
|
logger.debug(self.ifname + ": " + ev)
|
|
|
|
def wait_event(self, events, timeout):
|
|
start = os.times()[4]
|
|
while True:
|
|
while self.mon.pending():
|
|
ev = self.mon.recv()
|
|
logger.debug(self.ifname + ": " + ev)
|
|
for event in events:
|
|
if event in ev:
|
|
return ev
|
|
now = os.times()[4]
|
|
remaining = start + timeout - now
|
|
if remaining <= 0:
|
|
break
|
|
if not self.mon.pending(timeout=remaining):
|
|
break
|
|
return None
|
|
|
|
def get_status(self):
|
|
res = self.request("STATUS")
|
|
lines = res.splitlines()
|
|
vals = dict()
|
|
for l in lines:
|
|
[name,value] = l.split('=', 1)
|
|
vals[name] = value
|
|
return vals
|
|
|
|
def get_status_field(self, field):
|
|
vals = self.get_status()
|
|
if field in vals:
|
|
return vals[field]
|
|
return None
|
|
|
|
def get_driver_status(self):
|
|
res = self.request("STATUS-DRIVER")
|
|
lines = res.splitlines()
|
|
vals = dict()
|
|
for l in lines:
|
|
[name,value] = l.split('=', 1)
|
|
vals[name] = value
|
|
return vals
|
|
|
|
def get_driver_status_field(self, field):
|
|
vals = self.get_driver_status()
|
|
if field in vals:
|
|
return vals[field]
|
|
return None
|
|
|
|
def get_config(self):
|
|
res = self.request("GET_CONFIG")
|
|
lines = res.splitlines()
|
|
vals = dict()
|
|
for l in lines:
|
|
[name,value] = l.split('=', 1)
|
|
vals[name] = value
|
|
return vals
|
|
|
|
def mgmt_rx(self, timeout=5):
|
|
ev = self.wait_event(["MGMT-RX"], timeout=timeout)
|
|
if ev is None:
|
|
return None
|
|
msg = {}
|
|
frame = binascii.unhexlify(ev.split(' ')[1])
|
|
msg['frame'] = frame
|
|
|
|
hdr = struct.unpack('<HH6B6B6BH', frame[0:24])
|
|
msg['fc'] = hdr[0]
|
|
msg['subtype'] = (hdr[0] >> 4) & 0xf
|
|
hdr = hdr[1:]
|
|
msg['duration'] = hdr[0]
|
|
hdr = hdr[1:]
|
|
msg['da'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
|
|
hdr = hdr[6:]
|
|
msg['sa'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
|
|
hdr = hdr[6:]
|
|
msg['bssid'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
|
|
hdr = hdr[6:]
|
|
msg['seq_ctrl'] = hdr[0]
|
|
msg['payload'] = frame[24:]
|
|
|
|
return msg
|
|
|
|
def mgmt_tx(self, msg):
|
|
t = (msg['fc'], 0) + mac2tuple(msg['da']) + mac2tuple(msg['sa']) + mac2tuple(msg['bssid']) + (0,)
|
|
hdr = struct.pack('<HH6B6B6BH', *t)
|
|
self.request("MGMT_TX " + binascii.hexlify(hdr + msg['payload']))
|
|
|
|
def get_sta(self, addr, info=None, next=False):
|
|
cmd = "STA-NEXT " if next else "STA "
|
|
if addr is None:
|
|
res = self.request("STA-FIRST")
|
|
elif info:
|
|
res = self.request(cmd + addr + " " + info)
|
|
else:
|
|
res = self.request(cmd + addr)
|
|
lines = res.splitlines()
|
|
vals = dict()
|
|
first = True
|
|
for l in lines:
|
|
if first:
|
|
vals['addr'] = l
|
|
first = False
|
|
else:
|
|
[name,value] = l.split('=', 1)
|
|
vals[name] = value
|
|
return vals
|
|
|
|
def get_mib(self, param=None):
|
|
if param:
|
|
res = self.request("MIB " + param)
|
|
else:
|
|
res = self.request("MIB")
|
|
lines = res.splitlines()
|
|
vals = dict()
|
|
for l in lines:
|
|
name_val = l.split('=', 1)
|
|
if len(name_val) > 1:
|
|
vals[name_val[0]] = name_val[1]
|
|
return vals
|
|
|
|
def add_ap(ifname, params, wait_enabled=True, no_enable=False):
|
|
logger.info("Starting AP " + ifname)
|
|
hapd_global = HostapdGlobal()
|
|
hapd_global.remove(ifname)
|
|
hapd_global.add(ifname)
|
|
hapd = Hostapd(ifname)
|
|
if not hapd.ping():
|
|
raise Exception("Could not ping hostapd")
|
|
hapd.set_defaults()
|
|
fields = [ "ssid", "wpa_passphrase", "nas_identifier", "wpa_key_mgmt",
|
|
"wpa",
|
|
"wpa_pairwise", "rsn_pairwise", "auth_server_addr",
|
|
"acct_server_addr", "osu_server_uri" ]
|
|
for field in fields:
|
|
if field in params:
|
|
hapd.set(field, params[field])
|
|
for f,v in params.items():
|
|
if f in fields:
|
|
continue
|
|
if isinstance(v, list):
|
|
for val in v:
|
|
hapd.set(f, val)
|
|
else:
|
|
hapd.set(f, v)
|
|
if no_enable:
|
|
return hapd
|
|
hapd.enable()
|
|
if wait_enabled:
|
|
ev = hapd.wait_event(["AP-ENABLED", "AP-DISABLED"], timeout=30)
|
|
if ev is None:
|
|
raise Exception("AP startup timed out")
|
|
if "AP-ENABLED" not in ev:
|
|
raise Exception("AP startup failed")
|
|
return hapd
|
|
|
|
def add_bss(phy, ifname, confname, ignore_error=False):
|
|
logger.info("Starting BSS phy=" + phy + " ifname=" + ifname)
|
|
hapd_global = HostapdGlobal()
|
|
hapd_global.add_bss(phy, confname, ignore_error)
|
|
hapd = Hostapd(ifname)
|
|
if not hapd.ping():
|
|
raise Exception("Could not ping hostapd")
|
|
|
|
def add_iface(ifname, confname):
|
|
logger.info("Starting interface " + ifname)
|
|
hapd_global = HostapdGlobal()
|
|
hapd_global.add_iface(ifname, confname)
|
|
hapd = Hostapd(ifname)
|
|
if not hapd.ping():
|
|
raise Exception("Could not ping hostapd")
|
|
|
|
def remove_bss(ifname):
|
|
logger.info("Removing BSS " + ifname)
|
|
hapd_global = HostapdGlobal()
|
|
hapd_global.remove(ifname)
|
|
|
|
def wpa2_params(ssid=None, passphrase=None):
|
|
params = { "wpa": "2",
|
|
"wpa_key_mgmt": "WPA-PSK",
|
|
"rsn_pairwise": "CCMP" }
|
|
if ssid:
|
|
params["ssid"] = ssid
|
|
if passphrase:
|
|
params["wpa_passphrase"] = passphrase
|
|
return params
|
|
|
|
def wpa_params(ssid=None, passphrase=None):
|
|
params = { "wpa": "1",
|
|
"wpa_key_mgmt": "WPA-PSK",
|
|
"wpa_pairwise": "TKIP" }
|
|
if ssid:
|
|
params["ssid"] = ssid
|
|
if passphrase:
|
|
params["wpa_passphrase"] = passphrase
|
|
return params
|
|
|
|
def wpa_mixed_params(ssid=None, passphrase=None):
|
|
params = { "wpa": "3",
|
|
"wpa_key_mgmt": "WPA-PSK",
|
|
"wpa_pairwise": "TKIP",
|
|
"rsn_pairwise": "CCMP" }
|
|
if ssid:
|
|
params["ssid"] = ssid
|
|
if passphrase:
|
|
params["wpa_passphrase"] = passphrase
|
|
return params
|
|
|
|
def radius_params():
|
|
params = { "auth_server_addr": "127.0.0.1",
|
|
"auth_server_port": "1812",
|
|
"auth_server_shared_secret": "radius",
|
|
"nas_identifier": "nas.w1.fi" }
|
|
return params
|
|
|
|
def wpa_eap_params(ssid=None):
|
|
params = radius_params()
|
|
params["wpa"] = "1"
|
|
params["wpa_key_mgmt"] = "WPA-EAP"
|
|
params["wpa_pairwise"] = "TKIP"
|
|
params["ieee8021x"] = "1"
|
|
if ssid:
|
|
params["ssid"] = ssid
|
|
return params
|
|
|
|
def wpa2_eap_params(ssid=None):
|
|
params = radius_params()
|
|
params["wpa"] = "2"
|
|
params["wpa_key_mgmt"] = "WPA-EAP"
|
|
params["rsn_pairwise"] = "CCMP"
|
|
params["ieee8021x"] = "1"
|
|
if ssid:
|
|
params["ssid"] = ssid
|
|
return params
|