 57ff37d0e8
			
		
	
	
		57ff37d0e8
		
	
	
	
	
		
			
			At some point, these hostapd_oom_* test cases started to fail with wpa_msg() allocation failure for the AP-ENABLED event. This resulted in unnecessary long test execution (waiting 30 seconds for an event that was dropped). Speed this up by using a shorter timeout. Signed-off-by: Jouni Malinen <j@w1.fi>
		
			
				
	
	
		
			390 lines
		
	
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			390 lines
		
	
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # Python class for controlling hostapd
 | |
| # Copyright (c) 2013-2014, Jouni Malinen <j@w1.fi>
 | |
| #
 | |
| # This software may be distributed under the terms of the BSD license.
 | |
| # See README for more details.
 | |
| 
 | |
| import os
 | |
| import time
 | |
| import logging
 | |
| import binascii
 | |
| import struct
 | |
| import wpaspy
 | |
| 
 | |
| logger = logging.getLogger()
 | |
| hapd_ctrl = '/var/run/hostapd'
 | |
| hapd_global = '/var/run/hostapd-global'
 | |
| 
 | |
| def mac2tuple(mac):
 | |
|     return struct.unpack('6B', binascii.unhexlify(mac.replace(':','')))
 | |
| 
 | |
| class HostapdGlobal:
 | |
|     def __init__(self):
 | |
|         self.ctrl = wpaspy.Ctrl(hapd_global)
 | |
|         self.mon = wpaspy.Ctrl(hapd_global)
 | |
|         self.mon.attach()
 | |
| 
 | |
|     def request(self, cmd):
 | |
|         return self.ctrl.request(cmd)
 | |
| 
 | |
|     def wait_event(self, events, timeout):
 | |
|         start = os.times()[4]
 | |
|         while True:
 | |
|             while self.mon.pending():
 | |
|                 ev = self.mon.recv()
 | |
|                 logger.debug("(global): " + ev)
 | |
|                 for event in events:
 | |
|                     if event in ev:
 | |
|                         return ev
 | |
|             now = os.times()[4]
 | |
|             remaining = start + timeout - now
 | |
|             if remaining <= 0:
 | |
|                 break
 | |
|             if not self.mon.pending(timeout=remaining):
 | |
|                 break
 | |
|         return None
 | |
| 
 | |
|     def request(self, cmd):
 | |
|         return self.ctrl.request(cmd)
 | |
| 
 | |
|     def add(self, ifname, driver=None):
 | |
|         cmd = "ADD " + ifname + " " + hapd_ctrl
 | |
|         if driver:
 | |
|             cmd += " " + driver
 | |
|         res = self.ctrl.request(cmd)
 | |
|         if not "OK" in res:
 | |
|             raise Exception("Could not add hostapd interface " + ifname)
 | |
| 
 | |
|     def add_iface(self, ifname, confname):
 | |
|         res = self.ctrl.request("ADD " + ifname + " config=" + confname)
 | |
|         if not "OK" in res:
 | |
|             raise Exception("Could not add hostapd interface")
 | |
| 
 | |
|     def add_bss(self, phy, confname, ignore_error=False):
 | |
|         res = self.ctrl.request("ADD bss_config=" + phy + ":" + confname)
 | |
|         if not "OK" in res:
 | |
|             if not ignore_error:
 | |
|                 raise Exception("Could not add hostapd BSS")
 | |
| 
 | |
|     def remove(self, ifname):
 | |
|         self.ctrl.request("REMOVE " + ifname, timeout=30)
 | |
| 
 | |
|     def relog(self):
 | |
|         self.ctrl.request("RELOG")
 | |
| 
 | |
|     def flush(self):
 | |
|         self.ctrl.request("FLUSH")
 | |
| 
 | |
| 
 | |
| class Hostapd:
 | |
|     def __init__(self, ifname, bssidx=0):
 | |
|         self.ifname = ifname
 | |
|         self.ctrl = wpaspy.Ctrl(os.path.join(hapd_ctrl, ifname))
 | |
|         self.mon = wpaspy.Ctrl(os.path.join(hapd_ctrl, ifname))
 | |
|         self.mon.attach()
 | |
|         self.bssid = None
 | |
|         self.bssidx = bssidx
 | |
| 
 | |
|     def own_addr(self):
 | |
|         if self.bssid is None:
 | |
|             self.bssid = self.get_status_field('bssid[%d]' % self.bssidx)
 | |
|         return self.bssid
 | |
| 
 | |
|     def request(self, cmd):
 | |
|         logger.debug(self.ifname + ": CTRL: " + cmd)
 | |
|         return self.ctrl.request(cmd)
 | |
| 
 | |
|     def ping(self):
 | |
|         return "PONG" in self.request("PING")
 | |
| 
 | |
|     def set(self, field, value):
 | |
|         if not "OK" in self.request("SET " + field + " " + value):
 | |
|             raise Exception("Failed to set hostapd parameter " + field)
 | |
| 
 | |
|     def set_defaults(self):
 | |
|         self.set("driver", "nl80211")
 | |
|         self.set("hw_mode", "g")
 | |
|         self.set("channel", "1")
 | |
|         self.set("ieee80211n", "1")
 | |
|         self.set("logger_stdout", "-1")
 | |
|         self.set("logger_stdout_level", "0")
 | |
| 
 | |
|     def set_open(self, ssid):
 | |
|         self.set_defaults()
 | |
|         self.set("ssid", ssid)
 | |
| 
 | |
|     def set_wpa2_psk(self, ssid, passphrase):
 | |
|         self.set_defaults()
 | |
|         self.set("ssid", ssid)
 | |
|         self.set("wpa_passphrase", passphrase)
 | |
|         self.set("wpa", "2")
 | |
|         self.set("wpa_key_mgmt", "WPA-PSK")
 | |
|         self.set("rsn_pairwise", "CCMP")
 | |
| 
 | |
|     def set_wpa_psk(self, ssid, passphrase):
 | |
|         self.set_defaults()
 | |
|         self.set("ssid", ssid)
 | |
|         self.set("wpa_passphrase", passphrase)
 | |
|         self.set("wpa", "1")
 | |
|         self.set("wpa_key_mgmt", "WPA-PSK")
 | |
|         self.set("wpa_pairwise", "TKIP")
 | |
| 
 | |
|     def set_wpa_psk_mixed(self, ssid, passphrase):
 | |
|         self.set_defaults()
 | |
|         self.set("ssid", ssid)
 | |
|         self.set("wpa_passphrase", passphrase)
 | |
|         self.set("wpa", "3")
 | |
|         self.set("wpa_key_mgmt", "WPA-PSK")
 | |
|         self.set("wpa_pairwise", "TKIP")
 | |
|         self.set("rsn_pairwise", "CCMP")
 | |
| 
 | |
|     def set_wep(self, ssid, key):
 | |
|         self.set_defaults()
 | |
|         self.set("ssid", ssid)
 | |
|         self.set("wep_key0", key)
 | |
| 
 | |
|     def enable(self):
 | |
|         if not "OK" in self.request("ENABLE"):
 | |
|             raise Exception("Failed to enable hostapd interface " + self.ifname)
 | |
| 
 | |
|     def disable(self):
 | |
|         if not "OK" in self.request("DISABLE"):
 | |
|             raise Exception("Failed to disable hostapd interface " + self.ifname)
 | |
| 
 | |
|     def dump_monitor(self):
 | |
|         while self.mon.pending():
 | |
|             ev = self.mon.recv()
 | |
|             logger.debug(self.ifname + ": " + ev)
 | |
| 
 | |
|     def wait_event(self, events, timeout):
 | |
|         start = os.times()[4]
 | |
|         while True:
 | |
|             while self.mon.pending():
 | |
|                 ev = self.mon.recv()
 | |
|                 logger.debug(self.ifname + ": " + ev)
 | |
|                 for event in events:
 | |
|                     if event in ev:
 | |
|                         return ev
 | |
|             now = os.times()[4]
 | |
|             remaining = start + timeout - now
 | |
|             if remaining <= 0:
 | |
|                 break
 | |
|             if not self.mon.pending(timeout=remaining):
 | |
|                 break
 | |
|         return None
 | |
| 
 | |
|     def get_status(self):
 | |
|         res = self.request("STATUS")
 | |
|         lines = res.splitlines()
 | |
|         vals = dict()
 | |
|         for l in lines:
 | |
|             [name,value] = l.split('=', 1)
 | |
|             vals[name] = value
 | |
|         return vals
 | |
| 
 | |
|     def get_status_field(self, field):
 | |
|         vals = self.get_status()
 | |
|         if field in vals:
 | |
|             return vals[field]
 | |
|         return None
 | |
| 
 | |
|     def get_driver_status(self):
 | |
|         res = self.request("STATUS-DRIVER")
 | |
|         lines = res.splitlines()
 | |
|         vals = dict()
 | |
|         for l in lines:
 | |
|             [name,value] = l.split('=', 1)
 | |
|             vals[name] = value
 | |
|         return vals
 | |
| 
 | |
|     def get_driver_status_field(self, field):
 | |
|         vals = self.get_driver_status()
 | |
|         if field in vals:
 | |
|             return vals[field]
 | |
|         return None
 | |
| 
 | |
|     def get_config(self):
 | |
|         res = self.request("GET_CONFIG")
 | |
|         lines = res.splitlines()
 | |
|         vals = dict()
 | |
|         for l in lines:
 | |
|             [name,value] = l.split('=', 1)
 | |
|             vals[name] = value
 | |
|         return vals
 | |
| 
 | |
|     def mgmt_rx(self, timeout=5):
 | |
|         ev = self.wait_event(["MGMT-RX"], timeout=timeout)
 | |
|         if ev is None:
 | |
|             return None
 | |
|         msg = {}
 | |
|         frame = binascii.unhexlify(ev.split(' ')[1])
 | |
|         msg['frame'] = frame
 | |
| 
 | |
|         hdr = struct.unpack('<HH6B6B6BH', frame[0:24])
 | |
|         msg['fc'] = hdr[0]
 | |
|         msg['subtype'] = (hdr[0] >> 4) & 0xf
 | |
|         hdr = hdr[1:]
 | |
|         msg['duration'] = hdr[0]
 | |
|         hdr = hdr[1:]
 | |
|         msg['da'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
 | |
|         hdr = hdr[6:]
 | |
|         msg['sa'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
 | |
|         hdr = hdr[6:]
 | |
|         msg['bssid'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
 | |
|         hdr = hdr[6:]
 | |
|         msg['seq_ctrl'] = hdr[0]
 | |
|         msg['payload'] = frame[24:]
 | |
| 
 | |
|         return msg
 | |
| 
 | |
|     def mgmt_tx(self, msg):
 | |
|         t = (msg['fc'], 0) + mac2tuple(msg['da']) + mac2tuple(msg['sa']) + mac2tuple(msg['bssid']) + (0,)
 | |
|         hdr = struct.pack('<HH6B6B6BH', *t)
 | |
|         self.request("MGMT_TX " + binascii.hexlify(hdr + msg['payload']))
 | |
| 
 | |
|     def get_sta(self, addr, info=None, next=False):
 | |
|         cmd = "STA-NEXT " if next else "STA "
 | |
|         if addr is None:
 | |
|             res = self.request("STA-FIRST")
 | |
|         elif info:
 | |
|             res = self.request(cmd + addr + " " + info)
 | |
|         else:
 | |
|             res = self.request(cmd + addr)
 | |
|         lines = res.splitlines()
 | |
|         vals = dict()
 | |
|         first = True
 | |
|         for l in lines:
 | |
|             if first and '=' not in l:
 | |
|                 vals['addr'] = l
 | |
|                 first = False
 | |
|             else:
 | |
|                 [name,value] = l.split('=', 1)
 | |
|                 vals[name] = value
 | |
|         return vals
 | |
| 
 | |
|     def get_mib(self, param=None):
 | |
|         if param:
 | |
|             res = self.request("MIB " + param)
 | |
|         else:
 | |
|             res = self.request("MIB")
 | |
|         lines = res.splitlines()
 | |
|         vals = dict()
 | |
|         for l in lines:
 | |
|             name_val = l.split('=', 1)
 | |
|             if len(name_val) > 1:
 | |
|                 vals[name_val[0]] = name_val[1]
 | |
|         return vals
 | |
| 
 | |
| def add_ap(ifname, params, wait_enabled=True, no_enable=False, timeout=30):
 | |
|         logger.info("Starting AP " + ifname)
 | |
|         hapd_global = HostapdGlobal()
 | |
|         hapd_global.remove(ifname)
 | |
|         hapd_global.add(ifname)
 | |
|         hapd = Hostapd(ifname)
 | |
|         if not hapd.ping():
 | |
|             raise Exception("Could not ping hostapd")
 | |
|         hapd.set_defaults()
 | |
|         fields = [ "ssid", "wpa_passphrase", "nas_identifier", "wpa_key_mgmt",
 | |
|                    "wpa",
 | |
|                    "wpa_pairwise", "rsn_pairwise", "auth_server_addr",
 | |
|                    "acct_server_addr", "osu_server_uri" ]
 | |
|         for field in fields:
 | |
|             if field in params:
 | |
|                 hapd.set(field, params[field])
 | |
|         for f,v in params.items():
 | |
|             if f in fields:
 | |
|                 continue
 | |
|             if isinstance(v, list):
 | |
|                 for val in v:
 | |
|                     hapd.set(f, val)
 | |
|             else:
 | |
|                 hapd.set(f, v)
 | |
|         if no_enable:
 | |
|             return hapd
 | |
|         hapd.enable()
 | |
|         if wait_enabled:
 | |
|             ev = hapd.wait_event(["AP-ENABLED", "AP-DISABLED"], timeout=timeout)
 | |
|             if ev is None:
 | |
|                 raise Exception("AP startup timed out")
 | |
|             if "AP-ENABLED" not in ev:
 | |
|                 raise Exception("AP startup failed")
 | |
|         return hapd
 | |
| 
 | |
| def add_bss(phy, ifname, confname, ignore_error=False):
 | |
|     logger.info("Starting BSS phy=" + phy + " ifname=" + ifname)
 | |
|     hapd_global = HostapdGlobal()
 | |
|     hapd_global.add_bss(phy, confname, ignore_error)
 | |
|     hapd = Hostapd(ifname)
 | |
|     if not hapd.ping():
 | |
|         raise Exception("Could not ping hostapd")
 | |
| 
 | |
| def add_iface(ifname, confname):
 | |
|     logger.info("Starting interface " + ifname)
 | |
|     hapd_global = HostapdGlobal()
 | |
|     hapd_global.add_iface(ifname, confname)
 | |
|     hapd = Hostapd(ifname)
 | |
|     if not hapd.ping():
 | |
|         raise Exception("Could not ping hostapd")
 | |
| 
 | |
| def remove_bss(ifname):
 | |
|     logger.info("Removing BSS " + ifname)
 | |
|     hapd_global = HostapdGlobal()
 | |
|     hapd_global.remove(ifname)
 | |
| 
 | |
| def wpa2_params(ssid=None, passphrase=None):
 | |
|     params = { "wpa": "2",
 | |
|                "wpa_key_mgmt": "WPA-PSK",
 | |
|                "rsn_pairwise": "CCMP" }
 | |
|     if ssid:
 | |
|         params["ssid"] = ssid
 | |
|     if passphrase:
 | |
|         params["wpa_passphrase"] = passphrase
 | |
|     return params
 | |
| 
 | |
| def wpa_params(ssid=None, passphrase=None):
 | |
|     params = { "wpa": "1",
 | |
|                "wpa_key_mgmt": "WPA-PSK",
 | |
|                "wpa_pairwise": "TKIP" }
 | |
|     if ssid:
 | |
|         params["ssid"] = ssid
 | |
|     if passphrase:
 | |
|         params["wpa_passphrase"] = passphrase
 | |
|     return params
 | |
| 
 | |
| def wpa_mixed_params(ssid=None, passphrase=None):
 | |
|     params = { "wpa": "3",
 | |
|                "wpa_key_mgmt": "WPA-PSK",
 | |
|                "wpa_pairwise": "TKIP",
 | |
|                "rsn_pairwise": "CCMP" }
 | |
|     if ssid:
 | |
|         params["ssid"] = ssid
 | |
|     if passphrase:
 | |
|         params["wpa_passphrase"] = passphrase
 | |
|     return params
 | |
| 
 | |
| def radius_params():
 | |
|     params = { "auth_server_addr": "127.0.0.1",
 | |
|                "auth_server_port": "1812",
 | |
|                "auth_server_shared_secret": "radius",
 | |
|                "nas_identifier": "nas.w1.fi" }
 | |
|     return params
 | |
| 
 | |
| def wpa_eap_params(ssid=None):
 | |
|     params = radius_params()
 | |
|     params["wpa"] = "1"
 | |
|     params["wpa_key_mgmt"] = "WPA-EAP"
 | |
|     params["wpa_pairwise"] = "TKIP"
 | |
|     params["ieee8021x"] = "1"
 | |
|     if ssid:
 | |
|         params["ssid"] = ssid
 | |
|     return params
 | |
| 
 | |
| def wpa2_eap_params(ssid=None):
 | |
|     params = radius_params()
 | |
|     params["wpa"] = "2"
 | |
|     params["wpa_key_mgmt"] = "WPA-EAP"
 | |
|     params["rsn_pairwise"] = "CCMP"
 | |
|     params["ieee8021x"] = "1"
 | |
|     if ssid:
 | |
|         params["ssid"] = ssid
 | |
|     return params
 |