hostap/tests/hwsim/hostapd.py
Jouni Malinen b47750beba tests: Add test cases for automatic channel selection
This extends the Hostapd class to support monitor interface events and
STATUS command similarly to the WpaSupplicant class so that internal
hostapd state can be verified in more detail.

Signed-hostap: Jouni Malinen <j@w1.fi>
2013-11-03 21:30:31 +02:00

241 lines
7.3 KiB
Python

#!/usr/bin/python
#
# Python class for controlling hostapd
# Copyright (c) 2013, Jouni Malinen <j@w1.fi>
#
# This software may be distributed under the terms of the BSD license.
# See README for more details.
import os
import time
import logging
import wpaspy
logger = logging.getLogger()
hapd_ctrl = '/var/run/hostapd'
hapd_global = '/var/run/hostapd-global'
class HostapdGlobal:
def __init__(self):
self.ctrl = wpaspy.Ctrl(hapd_global)
def add(self, ifname):
res = self.ctrl.request("ADD " + ifname + " " + hapd_ctrl)
if not "OK" in res:
raise Exception("Could not add hostapd interface " + ifname)
def add_iface(self, ifname, confname):
res = self.ctrl.request("ADD " + ifname + " config=" + confname)
if not "OK" in res:
raise Exception("Could not add hostapd interface")
def add_bss(self, phy, confname, ignore_error=False):
res = self.ctrl.request("ADD bss_config=" + phy + ":" + confname)
if not "OK" in res:
if not ignore_error:
raise Exception("Could not add hostapd BSS")
def remove(self, ifname):
self.ctrl.request("REMOVE " + ifname)
def relog(self):
self.ctrl.request("RELOG")
class Hostapd:
def __init__(self, ifname):
self.ifname = ifname
self.ctrl = wpaspy.Ctrl(os.path.join(hapd_ctrl, ifname))
self.mon = wpaspy.Ctrl(os.path.join(hapd_ctrl, ifname))
self.mon.attach()
def request(self, cmd):
logger.debug(self.ifname + ": CTRL: " + cmd)
return self.ctrl.request(cmd)
def ping(self):
return "PONG" in self.request("PING")
def set(self, field, value):
logger.debug(self.ifname + ": SET " + field + "=" + value)
if not "OK" in self.request("SET " + field + " " + value):
raise Exception("Failed to set hostapd parameter " + field)
def set_defaults(self):
self.set("driver", "nl80211")
self.set("hw_mode", "g")
self.set("channel", "1")
self.set("ieee80211n", "1")
self.set("logger_stdout", "-1")
self.set("logger_stdout_level", "0")
def set_open(self, ssid):
self.set_defaults()
self.set("ssid", ssid)
def set_wpa2_psk(self, ssid, passphrase):
self.set_defaults()
self.set("ssid", ssid)
self.set("wpa_passphrase", passphrase)
self.set("wpa", "2")
self.set("wpa_key_mgmt", "WPA-PSK")
self.set("rsn_pairwise", "CCMP")
def set_wpa_psk(self, ssid, passphrase):
self.set_defaults()
self.set("ssid", ssid)
self.set("wpa_passphrase", passphrase)
self.set("wpa", "1")
self.set("wpa_key_mgmt", "WPA-PSK")
self.set("wpa_pairwise", "TKIP")
def set_wpa_psk_mixed(self, ssid, passphrase):
self.set_defaults()
self.set("ssid", ssid)
self.set("wpa_passphrase", passphrase)
self.set("wpa", "3")
self.set("wpa_key_mgmt", "WPA-PSK")
self.set("wpa_pairwise", "TKIP")
self.set("rsn_pairwise", "CCMP")
def set_wep(self, ssid, key):
self.set_defaults()
self.set("ssid", ssid)
self.set("wep_key0", key)
def enable(self):
if not "OK" in self.ctrl.request("ENABLE"):
raise Exception("Failed to enable hostapd interface " + self.ifname)
def disable(self):
if not "OK" in self.ctrl.request("ENABLE"):
raise Exception("Failed to disable hostapd interface " + self.ifname)
def dump_monitor(self):
while self.mon.pending():
ev = self.mon.recv()
logger.debug(self.ifname + ": " + ev)
def wait_event(self, events, timeout):
count = 0
while count < timeout * 10:
count = count + 1
time.sleep(0.1)
while self.mon.pending():
ev = self.mon.recv()
logger.debug(self.ifname + ": " + ev)
for event in events:
if event in ev:
return ev
return None
def get_status(self):
res = self.request("STATUS")
lines = res.splitlines()
vals = dict()
for l in lines:
[name,value] = l.split('=', 1)
vals[name] = value
return vals
def get_status_field(self, field):
vals = self.get_status()
if field in vals:
return vals[field]
return None
def add_ap(ifname, params):
logger.info("Starting AP " + ifname)
hapd_global = HostapdGlobal()
hapd_global.remove(ifname)
hapd_global.add(ifname)
hapd = Hostapd(ifname)
if not hapd.ping():
raise Exception("Could not ping hostapd")
hapd.set_defaults()
fields = [ "ssid", "wpa_passphrase", "nas_identifier", "wpa_key_mgmt",
"wpa",
"wpa_pairwise", "rsn_pairwise", "auth_server_addr" ]
for field in fields:
if field in params:
hapd.set(field, params[field])
for f,v in params.items():
if f in fields:
continue
if isinstance(v, list):
for val in v:
hapd.set(f, val)
else:
hapd.set(f, v)
hapd.enable()
return hapd
def add_bss(phy, ifname, confname, ignore_error=False):
logger.info("Starting BSS phy=" + phy + " ifname=" + ifname)
hapd_global = HostapdGlobal()
hapd_global.add_bss(phy, confname, ignore_error)
hapd = Hostapd(ifname)
if not hapd.ping():
raise Exception("Could not ping hostapd")
def add_iface(ifname, confname):
logger.info("Starting interface " + ifname)
hapd_global = HostapdGlobal()
hapd_global.add_iface(ifname, confname)
hapd = Hostapd(ifname)
if not hapd.ping():
raise Exception("Could not ping hostapd")
def remove_bss(ifname):
logger.info("Removing BSS " + ifname)
hapd_global = HostapdGlobal()
hapd_global.remove(ifname)
def wpa2_params(ssid=None, passphrase=None):
params = { "wpa": "2",
"wpa_key_mgmt": "WPA-PSK",
"rsn_pairwise": "CCMP" }
if ssid:
params["ssid"] = ssid
if passphrase:
params["wpa_passphrase"] = passphrase
return params
def wpa_params(ssid=None, passphrase=None):
params = { "wpa": "1",
"wpa_key_mgmt": "WPA-PSK",
"wpa_pairwise": "TKIP" }
if ssid:
params["ssid"] = ssid
if passphrase:
params["wpa_passphrase"] = passphrase
return params
def wpa_mixed_params(ssid=None, passphrase=None):
params = { "wpa": "3",
"wpa_key_mgmt": "WPA-PSK",
"wpa_pairwise": "TKIP",
"rsn_pairwise": "CCMP" }
if ssid:
params["ssid"] = ssid
if passphrase:
params["wpa_passphrase"] = passphrase
return params
def radius_params():
params = { "auth_server_addr": "127.0.0.1",
"auth_server_port": "1812",
"auth_server_shared_secret": "radius",
"nas_identifier": "nas.w1.fi" }
return params
def wpa2_eap_params(ssid=None):
params = radius_params()
params["wpa"] = "2"
params["wpa_key_mgmt"] = "WPA-EAP"
params["rsn_pairwise"] = "CCMP"
params["ieee8021x"] = "1"
if ssid:
params["ssid"] = ssid
return params