# Python class for controlling wpa_supplicant # Copyright (c) 2013-2014, Jouni Malinen # # This software may be distributed under the terms of the BSD license. # See README for more details. import os import time import logging import binascii import re import struct import subprocess import wpaspy logger = logging.getLogger() wpas_ctrl = '/var/run/wpa_supplicant' class WpaSupplicant: def __init__(self, ifname=None, global_iface=None): self.group_ifname = None if ifname: self.set_ifname(ifname) else: self.ifname = None self.global_iface = global_iface if global_iface: self.global_ctrl = wpaspy.Ctrl(global_iface) self.global_mon = wpaspy.Ctrl(global_iface) self.global_mon.attach() def set_ifname(self, ifname): self.ifname = ifname self.ctrl = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname)) self.mon = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname)) self.mon.attach() def remove_ifname(self): if self.ifname: self.mon.detach() self.mon = None self.ctrl = None self.ifname = None def interface_add(self, ifname, driver="nl80211", drv_params=None): try: groups = subprocess.check_output(["id"]) group = "admin" if "(admin)" in groups else "adm" except Exception, e: group = "admin" cmd = "INTERFACE_ADD " + ifname + "\t\t" + driver + "\tDIR=/var/run/wpa_supplicant GROUP=" + group if drv_params: cmd = cmd + '\t' + drv_params if "FAIL" in self.global_request(cmd): raise Exception("Failed to add a dynamic wpa_supplicant interface") self.set_ifname(ifname) def interface_remove(self, ifname): self.remove_ifname() self.global_request("INTERFACE_REMOVE " + ifname) def request(self, cmd): logger.debug(self.ifname + ": CTRL: " + cmd) return self.ctrl.request(cmd) def global_request(self, cmd): if self.global_iface is None: self.request(cmd) else: ifname = self.ifname or self.global_iface logger.debug(ifname + ": CTRL: " + cmd) return self.global_ctrl.request(cmd) def group_request(self, cmd): if self.group_ifname and self.group_ifname != self.ifname: logger.debug(self.group_ifname + ": CTRL: " + cmd) gctrl = wpaspy.Ctrl(os.path.join(wpas_ctrl, self.group_ifname)) return gctrl.request(cmd) return self.request(cmd) def ping(self): return "PONG" in self.request("PING") def reset(self): res = self.request("FLUSH") if not "OK" in res: logger.info("FLUSH to " + self.ifname + " failed: " + res) self.request("WPS_ER_STOP") self.request("SET pmf 0") self.request("SET external_sim 0") self.request("SET hessid 00:00:00:00:00:00") self.request("SET access_network_type 15") self.request("SET p2p_add_cli_chan 0") self.request("SET p2p_no_go_freq ") self.request("SET p2p_pref_chan ") self.request("SET p2p_no_group_iface 1") self.request("SET p2p_go_intent 7") self.group_ifname = None self.dump_monitor() iter = 0 while iter < 60: state = self.get_driver_status_field("scan_state") if "SCAN_STARTED" in state or "SCAN_REQUESTED" in state: logger.info(self.ifname + ": Waiting for scan operation to complete before continuing") time.sleep(1) else: break iter = iter + 1 if iter == 60: logger.error(self.ifname + ": Driver scan state did not clear") print "Trying to clear cfg80211/mac80211 scan state" try: cmd = ["sudo", "ifconfig", self.ifname, "down"] subprocess.call(cmd) except subprocess.CalledProcessError, e: logger.info("ifconfig failed: " + str(e.returncode)) logger.info(e.output) try: cmd = ["sudo", "ifconfig", self.ifname, "up"] subprocess.call(cmd) except subprocess.CalledProcessError, e: logger.info("ifconfig failed: " + str(e.returncode)) logger.info(e.output) if iter > 0: # The ongoing scan could have discovered BSSes or P2P peers logger.info("Run FLUSH again since scan was in progress") self.request("FLUSH") self.dump_monitor() if not self.ping(): logger.info("No PING response from " + self.ifname + " after reset") def add_network(self): id = self.request("ADD_NETWORK") if "FAIL" in id: raise Exception("ADD_NETWORK failed") return int(id) def remove_network(self, id): id = self.request("REMOVE_NETWORK " + str(id)) if "FAIL" in id: raise Exception("REMOVE_NETWORK failed") return None def get_network(self, id, field): res = self.request("GET_NETWORK " + str(id) + " " + field) if res == "FAIL\n": return None return res def set_network(self, id, field, value): res = self.request("SET_NETWORK " + str(id) + " " + field + " " + value) if "FAIL" in res: raise Exception("SET_NETWORK failed") return None def set_network_quoted(self, id, field, value): res = self.request("SET_NETWORK " + str(id) + " " + field + ' "' + value + '"') if "FAIL" in res: raise Exception("SET_NETWORK failed") return None def list_networks(self): res = self.request("LIST_NETWORKS") lines = res.splitlines() networks = [] for l in lines: if "network id" in l: continue [id,ssid,bssid,flags] = l.split('\t') network = {} network['id'] = id network['ssid'] = ssid network['bssid'] = bssid network['flags'] = flags networks.append(network) return networks def hs20_enable(self): self.request("SET interworking 1") self.request("SET hs20 1") def add_cred(self): id = self.request("ADD_CRED") if "FAIL" in id: raise Exception("ADD_CRED failed") return int(id) def remove_cred(self, id): id = self.request("REMOVE_CRED " + str(id)) if "FAIL" in id: raise Exception("REMOVE_CRED failed") return None def set_cred(self, id, field, value): res = self.request("SET_CRED " + str(id) + " " + field + " " + value) if "FAIL" in res: raise Exception("SET_CRED failed") return None def set_cred_quoted(self, id, field, value): res = self.request("SET_CRED " + str(id) + " " + field + ' "' + value + '"') if "FAIL" in res: raise Exception("SET_CRED failed") return None def get_cred(self, id, field): return self.request("GET_CRED " + str(id) + " " + field) def add_cred_values(self, params): id = self.add_cred() quoted = [ "realm", "username", "password", "domain", "imsi", "excluded_ssid", "milenage", "ca_cert", "client_cert", "private_key", "domain_suffix_match", "provisioning_sp", "roaming_partner", "phase1", "phase2" ] for field in quoted: if field in params: self.set_cred_quoted(id, field, params[field]) not_quoted = [ "eap", "roaming_consortium", "priority", "required_roaming_consortium", "sp_priority", "max_bss_load", "update_identifier", "req_conn_capab", "min_dl_bandwidth_home", "min_ul_bandwidth_home", "min_dl_bandwidth_roaming", "min_ul_bandwidth_roaming" ] for field in not_quoted: if field in params: self.set_cred(id, field, params[field]) return id; def select_network(self, id, freq=None): if freq: extra = " freq=" + freq else: extra = "" id = self.request("SELECT_NETWORK " + str(id) + extra) if "FAIL" in id: raise Exception("SELECT_NETWORK failed") return None def connect_network(self, id, timeout=10): self.dump_monitor() self.select_network(id) ev = self.wait_event(["CTRL-EVENT-CONNECTED"], timeout=timeout) if ev is None: raise Exception("Association with the AP timed out") self.dump_monitor() def get_status(self, extra=None): if extra: extra = "-" + extra else: extra = "" res = self.request("STATUS" + extra) lines = res.splitlines() vals = dict() for l in lines: try: [name,value] = l.split('=', 1) vals[name] = value except ValueError, e: logger.info(self.ifname + ": Ignore unexpected STATUS line: " + l) return vals def get_status_field(self, field, extra=None): vals = self.get_status(extra) if field in vals: return vals[field] return None def get_group_status(self, extra=None): if extra: extra = "-" + extra else: extra = "" res = self.group_request("STATUS" + extra) lines = res.splitlines() vals = dict() for l in lines: [name,value] = l.split('=', 1) vals[name] = value return vals def get_group_status_field(self, field, extra=None): vals = self.get_group_status(extra) if field in vals: return vals[field] return None def get_driver_status(self): res = self.request("STATUS-DRIVER") lines = res.splitlines() vals = dict() for l in lines: [name,value] = l.split('=', 1) vals[name] = value return vals def get_driver_status_field(self, field): vals = self.get_driver_status() if field in vals: return vals[field] return None def get_mib(self): res = self.request("MIB") lines = res.splitlines() vals = dict() for l in lines: try: [name,value] = l.split('=', 1) vals[name] = value except ValueError, e: logger.info(self.ifname + ": Ignore unexpected MIB line: " + l) return vals def p2p_dev_addr(self): return self.get_status_field("p2p_device_address") def p2p_interface_addr(self): return self.get_group_status_field("address") def p2p_listen(self): return self.global_request("P2P_LISTEN") def p2p_find(self, social=False, dev_id=None, dev_type=None): cmd = "P2P_FIND" if social: cmd = cmd + " type=social" if dev_id: cmd = cmd + " dev_id=" + dev_id if dev_type: cmd = cmd + " dev_type=" + dev_type return self.global_request(cmd) def p2p_stop_find(self): return self.global_request("P2P_STOP_FIND") def wps_read_pin(self): self.pin = self.request("WPS_PIN get").rstrip("\n") if "FAIL" in self.pin: raise Exception("Could not generate PIN") return self.pin def peer_known(self, peer, full=True): res = self.global_request("P2P_PEER " + peer) if peer.lower() not in res.lower(): return False if not full: return True return "[PROBE_REQ_ONLY]" not in res def discover_peer(self, peer, full=True, timeout=15, social=True, force_find=False): logger.info(self.ifname + ": Trying to discover peer " + peer) if not force_find and self.peer_known(peer, full): return True self.p2p_find(social) count = 0 while count < timeout: time.sleep(1) count = count + 1 if self.peer_known(peer, full): return True return False def get_peer(self, peer): res = self.global_request("P2P_PEER " + peer) if peer.lower() not in res.lower(): raise Exception("Peer information not available") lines = res.splitlines() vals = dict() for l in lines: if '=' in l: [name,value] = l.split('=', 1) vals[name] = value return vals def group_form_result(self, ev, expect_failure=False, go_neg_res=None): if expect_failure: if "P2P-GROUP-STARTED" in ev: raise Exception("Group formation succeeded when expecting failure") exp = r'<.>(P2P-GO-NEG-FAILURE) status=([0-9]*)' s = re.split(exp, ev) if len(s) < 3: return None res = {} res['result'] = 'go-neg-failed' res['status'] = int(s[2]) return res if "P2P-GROUP-STARTED" not in ev: raise Exception("No P2P-GROUP-STARTED event seen") exp = r'<.>(P2P-GROUP-STARTED) ([^ ]*) ([^ ]*) ssid="(.*)" freq=([0-9]*) ((?:psk=.*)|(?:passphrase=".*")) go_dev_addr=([0-9a-f:]*) ip_addr=([0-9.]*) ip_mask=([0-9.]*) go_ip_addr=([0-9.]*)' s = re.split(exp, ev) if len(s) < 11: exp = r'<.>(P2P-GROUP-STARTED) ([^ ]*) ([^ ]*) ssid="(.*)" freq=([0-9]*) ((?:psk=.*)|(?:passphrase=".*")) go_dev_addr=([0-9a-f:]*)' s = re.split(exp, ev) if len(s) < 8: raise Exception("Could not parse P2P-GROUP-STARTED") res = {} res['result'] = 'success' res['ifname'] = s[2] self.group_ifname = s[2] res['role'] = s[3] res['ssid'] = s[4] res['freq'] = s[5] if "[PERSISTENT]" in ev: res['persistent'] = True else: res['persistent'] = False p = re.match(r'psk=([0-9a-f]*)', s[6]) if p: res['psk'] = p.group(1) p = re.match(r'passphrase="(.*)"', s[6]) if p: res['passphrase'] = p.group(1) res['go_dev_addr'] = s[7] if len(s) > 8 and len(s[8]) > 0: res['ip_addr'] = s[8] if len(s) > 9: res['ip_mask'] = s[9] if len(s) > 10: res['go_ip_addr'] = s[10] if go_neg_res: exp = r'<.>(P2P-GO-NEG-SUCCESS) role=(GO|client) freq=([0-9]*)' s = re.split(exp, go_neg_res) if len(s) < 4: raise Exception("Could not parse P2P-GO-NEG-SUCCESS") res['go_neg_role'] = s[2] res['go_neg_freq'] = s[3] return res def p2p_go_neg_auth(self, peer, pin, method, go_intent=None, persistent=False, freq=None): if not self.discover_peer(peer): raise Exception("Peer " + peer + " not found") self.dump_monitor() cmd = "P2P_CONNECT " + peer + " " + pin + " " + method + " auth" if go_intent: cmd = cmd + ' go_intent=' + str(go_intent) if freq: cmd = cmd + ' freq=' + str(freq) if persistent: cmd = cmd + " persistent" if "OK" in self.global_request(cmd): return None raise Exception("P2P_CONNECT (auth) failed") def p2p_go_neg_auth_result(self, timeout=1, expect_failure=False): go_neg_res = None ev = self.wait_global_event(["P2P-GO-NEG-SUCCESS", "P2P-GO-NEG-FAILURE"], timeout); if ev is None: if expect_failure: return None raise Exception("Group formation timed out") if "P2P-GO-NEG-SUCCESS" in ev: go_neg_res = ev ev = self.wait_global_event(["P2P-GROUP-STARTED"], timeout); if ev is None: if expect_failure: return None raise Exception("Group formation timed out") self.dump_monitor() return self.group_form_result(ev, expect_failure, go_neg_res) def p2p_go_neg_init(self, peer, pin, method, timeout=0, go_intent=None, expect_failure=False, persistent=False, freq=None, provdisc=False): if not self.discover_peer(peer): raise Exception("Peer " + peer + " not found") self.dump_monitor() if pin: cmd = "P2P_CONNECT " + peer + " " + pin + " " + method else: cmd = "P2P_CONNECT " + peer + " " + method if go_intent: cmd = cmd + ' go_intent=' + str(go_intent) if freq: cmd = cmd + ' freq=' + str(freq) if persistent: cmd = cmd + " persistent" if provdisc: cmd = cmd + " provdisc" if "OK" in self.global_request(cmd): if timeout == 0: self.dump_monitor() return None go_neg_res = None ev = self.wait_global_event(["P2P-GO-NEG-SUCCESS", "P2P-GO-NEG-FAILURE"], timeout) if ev is None: if expect_failure: return None raise Exception("Group formation timed out") if "P2P-GO-NEG-SUCCESS" in ev: go_neg_res = ev ev = self.wait_global_event(["P2P-GROUP-STARTED"], timeout) if ev is None: if expect_failure: return None raise Exception("Group formation timed out") self.dump_monitor() return self.group_form_result(ev, expect_failure, go_neg_res) raise Exception("P2P_CONNECT failed") def wait_event(self, events, timeout=10): start = os.times()[4] while True: while self.mon.pending(): ev = self.mon.recv() logger.debug(self.ifname + ": " + ev) for event in events: if event in ev: return ev now = os.times()[4] remaining = start + timeout - now if remaining <= 0: break if not self.mon.pending(timeout=remaining): break return None def wait_global_event(self, events, timeout): if self.global_iface is None: self.wait_event(events, timeout) else: start = os.times()[4] while True: while self.global_mon.pending(): ev = self.global_mon.recv() logger.debug(self.ifname + "(global): " + ev) for event in events: if event in ev: return ev now = os.times()[4] remaining = start + timeout - now if remaining <= 0: break if not self.global_mon.pending(timeout=remaining): break return None def wait_go_ending_session(self): ev = self.wait_event(["P2P-GROUP-REMOVED"], timeout=3) if ev is None: raise Exception("Group removal event timed out") if "reason=GO_ENDING_SESSION" not in ev: raise Exception("Unexpected group removal reason") def dump_monitor(self): while self.mon.pending(): ev = self.mon.recv() logger.debug(self.ifname + ": " + ev) while self.global_mon.pending(): ev = self.global_mon.recv() logger.debug(self.ifname + "(global): " + ev) def remove_group(self, ifname=None): if ifname is None: ifname = self.group_ifname if self.group_ifname else self.ifname if "OK" not in self.global_request("P2P_GROUP_REMOVE " + ifname): raise Exception("Group could not be removed") self.group_ifname = None def p2p_start_go(self, persistent=None, freq=None): self.dump_monitor() cmd = "P2P_GROUP_ADD" if persistent is None: pass elif persistent is True: cmd = cmd + " persistent" else: cmd = cmd + " persistent=" + str(persistent) if freq: cmd = cmd + " freq=" + str(freq) if "OK" in self.global_request(cmd): ev = self.wait_global_event(["P2P-GROUP-STARTED"], timeout=5) if ev is None: raise Exception("GO start up timed out") self.dump_monitor() return self.group_form_result(ev) raise Exception("P2P_GROUP_ADD failed") def p2p_go_authorize_client(self, pin): cmd = "WPS_PIN any " + pin if "FAIL" in self.group_request(cmd): raise Exception("Failed to authorize client connection on GO") return None def p2p_go_authorize_client_pbc(self): cmd = "WPS_PBC" if "FAIL" in self.group_request(cmd): raise Exception("Failed to authorize client connection on GO") return None def p2p_connect_group(self, go_addr, pin, timeout=0, social=False): self.dump_monitor() if not self.discover_peer(go_addr, social=social): raise Exception("GO " + go_addr + " not found") self.dump_monitor() cmd = "P2P_CONNECT " + go_addr + " " + pin + " join" if "OK" in self.global_request(cmd): if timeout == 0: self.dump_monitor() return None ev = self.wait_global_event(["P2P-GROUP-STARTED"], timeout) if ev is None: raise Exception("Joining the group timed out") self.dump_monitor() return self.group_form_result(ev) raise Exception("P2P_CONNECT(join) failed") def tdls_setup(self, peer): cmd = "TDLS_SETUP " + peer if "FAIL" in self.group_request(cmd): raise Exception("Failed to request TDLS setup") return None def tdls_teardown(self, peer): cmd = "TDLS_TEARDOWN " + peer if "FAIL" in self.group_request(cmd): raise Exception("Failed to request TDLS teardown") return None def connect(self, ssid=None, ssid2=None, **kwargs): logger.info("Connect STA " + self.ifname + " to AP") id = self.add_network() if ssid: self.set_network_quoted(id, "ssid", ssid) elif ssid2: self.set_network(id, "ssid", ssid2) quoted = [ "psk", "identity", "anonymous_identity", "password", "ca_cert", "client_cert", "private_key", "private_key_passwd", "ca_cert2", "client_cert2", "private_key2", "phase1", "phase2", "domain_suffix_match", "altsubject_match", "subject_match", "pac_file", "dh_file", "bgscan" ] for field in quoted: if field in kwargs and kwargs[field]: self.set_network_quoted(id, field, kwargs[field]) not_quoted = [ "proto", "key_mgmt", "ieee80211w", "pairwise", "group", "wep_key0", "scan_freq", "eap", "eapol_flags", "fragment_size", "scan_ssid", "auth_alg", "wpa_ptk_rekey", "disable_ht", "disable_vht", "bssid" ] for field in not_quoted: if field in kwargs and kwargs[field]: self.set_network(id, field, kwargs[field]) if "raw_psk" in kwargs and kwargs['raw_psk']: self.set_network(id, "psk", kwargs['raw_psk']) if "password_hex" in kwargs and kwargs['password_hex']: self.set_network(id, "password", kwargs['password_hex']) if "peerkey" in kwargs and kwargs['peerkey']: self.set_network(id, "peerkey", "1") if "okc" in kwargs and kwargs['okc']: self.set_network(id, "proactive_key_caching", "1") if "ocsp" in kwargs and kwargs['ocsp']: self.set_network(id, "ocsp", str(kwargs['ocsp'])) if "only_add_network" in kwargs and kwargs['only_add_network']: return id if "wait_connect" not in kwargs or kwargs['wait_connect']: if "eap" in kwargs: self.connect_network(id, timeout=20) else: self.connect_network(id) else: self.dump_monitor() self.select_network(id) return id def scan(self, type=None, freq=None, no_wait=False, only_new=False): if type: cmd = "SCAN TYPE=" + type else: cmd = "SCAN" if freq: cmd = cmd + " freq=" + freq if only_new: cmd += " only_new=1" if not no_wait: self.dump_monitor() if not "OK" in self.request(cmd): raise Exception("Failed to trigger scan") if no_wait: return ev = self.wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15) if ev is None: raise Exception("Scan timed out") def roam(self, bssid, fail_test=False): self.dump_monitor() self.request("ROAM " + bssid) if fail_test: ev = self.wait_event(["CTRL-EVENT-CONNECTED"], timeout=1) if ev is not None: raise Exception("Unexpected connection") self.dump_monitor() return ev = self.wait_event(["CTRL-EVENT-CONNECTED"], timeout=10) if ev is None: raise Exception("Roaming with the AP timed out") self.dump_monitor() def roam_over_ds(self, bssid, fail_test=False): self.dump_monitor() self.request("FT_DS " + bssid) if fail_test: ev = self.wait_event(["CTRL-EVENT-CONNECTED"], timeout=1) if ev is not None: raise Exception("Unexpected connection") self.dump_monitor() return ev = self.wait_event(["CTRL-EVENT-CONNECTED"], timeout=10) if ev is None: raise Exception("Roaming with the AP timed out") self.dump_monitor() def wps_reg(self, bssid, pin, new_ssid=None, key_mgmt=None, cipher=None, new_passphrase=None, no_wait=False): self.dump_monitor() if new_ssid: self.request("WPS_REG " + bssid + " " + pin + " " + new_ssid.encode("hex") + " " + key_mgmt + " " + cipher + " " + new_passphrase.encode("hex")) if no_wait: return ev = self.wait_event(["WPS-SUCCESS"], timeout=15) else: self.request("WPS_REG " + bssid + " " + pin) if no_wait: return ev = self.wait_event(["WPS-CRED-RECEIVED"], timeout=15) if ev is None: raise Exception("WPS cred timed out") ev = self.wait_event(["WPS-FAIL"], timeout=15) if ev is None: raise Exception("WPS timed out") ev = self.wait_event(["CTRL-EVENT-CONNECTED"], timeout=15) if ev is None: raise Exception("Association with the AP timed out") def relog(self): self.request("RELOG") def wait_completed(self, timeout=10): for i in range(0, timeout * 2): if self.get_status_field("wpa_state") == "COMPLETED": return time.sleep(0.5) raise Exception("Timeout while waiting for COMPLETED state") def get_capability(self, field): res = self.request("GET_CAPABILITY " + field) if "FAIL" in res: return None return res.split(' ') def get_bss(self, bssid): res = self.request("BSS " + bssid) lines = res.splitlines() vals = dict() for l in lines: [name,value] = l.split('=', 1) vals[name] = value return vals def get_pmksa(self, bssid): res = self.request("PMKSA") lines = res.splitlines() for l in lines: if bssid not in l: continue vals = dict() [index,aa,pmkid,expiration,opportunistic] = l.split(' ') vals['index'] = index vals['pmkid'] = pmkid vals['expiration'] = expiration vals['opportunistic'] = opportunistic return vals return None def get_sta(self, addr, info=None, next=False): cmd = "STA-NEXT " if next else "STA " if addr is None: res = self.request("STA-FIRST") elif info: res = self.request(cmd + addr + " " + info) else: res = self.request(cmd + addr) lines = res.splitlines() vals = dict() first = True for l in lines: if first: vals['addr'] = l first = False else: [name,value] = l.split('=', 1) vals[name] = value return vals def mgmt_rx(self, timeout=5): ev = self.wait_event(["MGMT-RX"], timeout=timeout) if ev is None: return None msg = {} items = ev.split(' ') field,val = items[1].split('=') if field != "freq": raise Exception("Unexpected MGMT-RX event format: " + ev) msg['freq'] = val frame = binascii.unhexlify(items[4]) msg['frame'] = frame hdr = struct.unpack('> 4) & 0xf hdr = hdr[1:] msg['duration'] = hdr[0] hdr = hdr[1:] msg['da'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6] hdr = hdr[6:] msg['sa'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6] hdr = hdr[6:] msg['bssid'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6] hdr = hdr[6:] msg['seq_ctrl'] = hdr[0] msg['payload'] = frame[24:] return msg