#!/usr/bin/python # # Python class for controlling hostapd # Copyright (c) 2013-2014, Jouni Malinen # # This software may be distributed under the terms of the BSD license. # See README for more details. import os import time import logging import binascii import struct import wpaspy logger = logging.getLogger() hapd_ctrl = '/var/run/hostapd' hapd_global = '/var/run/hostapd-global' def mac2tuple(mac): return struct.unpack('6B', binascii.unhexlify(mac.replace(':',''))) class HostapdGlobal: def __init__(self): self.ctrl = wpaspy.Ctrl(hapd_global) def add(self, ifname): res = self.ctrl.request("ADD " + ifname + " " + hapd_ctrl) if not "OK" in res: raise Exception("Could not add hostapd interface " + ifname) def add_iface(self, ifname, confname): res = self.ctrl.request("ADD " + ifname + " config=" + confname) if not "OK" in res: raise Exception("Could not add hostapd interface") def add_bss(self, phy, confname, ignore_error=False): res = self.ctrl.request("ADD bss_config=" + phy + ":" + confname) if not "OK" in res: if not ignore_error: raise Exception("Could not add hostapd BSS") def remove(self, ifname): self.ctrl.request("REMOVE " + ifname) def relog(self): self.ctrl.request("RELOG") class Hostapd: def __init__(self, ifname): self.ifname = ifname self.ctrl = wpaspy.Ctrl(os.path.join(hapd_ctrl, ifname)) self.mon = wpaspy.Ctrl(os.path.join(hapd_ctrl, ifname)) self.mon.attach() def request(self, cmd): logger.debug(self.ifname + ": CTRL: " + cmd) return self.ctrl.request(cmd) def ping(self): return "PONG" in self.request("PING") def set(self, field, value): if not "OK" in self.request("SET " + field + " " + value): raise Exception("Failed to set hostapd parameter " + field) def set_defaults(self): self.set("driver", "nl80211") self.set("hw_mode", "g") self.set("channel", "1") self.set("ieee80211n", "1") self.set("logger_stdout", "-1") self.set("logger_stdout_level", "0") def set_open(self, ssid): self.set_defaults() self.set("ssid", ssid) def set_wpa2_psk(self, ssid, passphrase): self.set_defaults() self.set("ssid", ssid) self.set("wpa_passphrase", passphrase) self.set("wpa", "2") self.set("wpa_key_mgmt", "WPA-PSK") self.set("rsn_pairwise", "CCMP") def set_wpa_psk(self, ssid, passphrase): self.set_defaults() self.set("ssid", ssid) self.set("wpa_passphrase", passphrase) self.set("wpa", "1") self.set("wpa_key_mgmt", "WPA-PSK") self.set("wpa_pairwise", "TKIP") def set_wpa_psk_mixed(self, ssid, passphrase): self.set_defaults() self.set("ssid", ssid) self.set("wpa_passphrase", passphrase) self.set("wpa", "3") self.set("wpa_key_mgmt", "WPA-PSK") self.set("wpa_pairwise", "TKIP") self.set("rsn_pairwise", "CCMP") def set_wep(self, ssid, key): self.set_defaults() self.set("ssid", ssid) self.set("wep_key0", key) def enable(self): if not "OK" in self.request("ENABLE"): raise Exception("Failed to enable hostapd interface " + self.ifname) def disable(self): if not "OK" in self.request("ENABLE"): raise Exception("Failed to disable hostapd interface " + self.ifname) def dump_monitor(self): while self.mon.pending(): ev = self.mon.recv() logger.debug(self.ifname + ": " + ev) def wait_event(self, events, timeout): start = os.times()[4] while True: while self.mon.pending(): ev = self.mon.recv() logger.debug(self.ifname + ": " + ev) for event in events: if event in ev: return ev now = os.times()[4] remaining = start + timeout - now if remaining <= 0: break if not self.mon.pending(timeout=remaining): break return None def get_status(self): res = self.request("STATUS") lines = res.splitlines() vals = dict() for l in lines: [name,value] = l.split('=', 1) vals[name] = value return vals def get_status_field(self, field): vals = self.get_status() if field in vals: return vals[field] return None def mgmt_rx(self, timeout=5): ev = self.wait_event(["MGMT-RX"], timeout=timeout) if ev is None: return None msg = {} frame = binascii.unhexlify(ev.split(' ')[1]) msg['frame'] = frame hdr = struct.unpack('> 4) & 0xf hdr = hdr[1:] msg['duration'] = hdr[0] hdr = hdr[1:] msg['da'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6] hdr = hdr[6:] msg['sa'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6] hdr = hdr[6:] msg['bssid'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6] hdr = hdr[6:] msg['seq_ctrl'] = hdr[0] msg['payload'] = frame[24:] return msg def mgmt_tx(self, msg): t = (msg['fc'], 0) + mac2tuple(msg['da']) + mac2tuple(msg['sa']) + mac2tuple(msg['bssid']) + (0,) hdr = struct.pack('