hostap/tests/hwsim/wpasupplicant.py
Jouni Malinen 803edd1c09 tests: Fix BSS table flushing and old result ignoring for WPS tests
The WPS tests are more prone to fail if scan results from the previous
test cases are allowed to remain in the wpa_supplicant BSS table during
the consecutive test since the test setup uses the same BSSID for the
test APs that change their configuration. Avoid these mostly bogus
issues by enforcing wpa_supplicant to drop and ignore old scan results
during the WPS test cases.

Signed-hostap: Jouni Malinen <j@w1.fi>
2013-03-31 18:05:42 +03:00

355 lines
12 KiB
Python

#!/usr/bin/python
#
# Python class for controlling wpa_supplicant
# Copyright (c) 2013, Jouni Malinen <j@w1.fi>
#
# This software may be distributed under the terms of the BSD license.
# See README for more details.
import os
import time
import logging
import re
import wpaspy
logger = logging.getLogger(__name__)
wpas_ctrl = '/var/run/wpa_supplicant'
class WpaSupplicant:
def __init__(self, ifname):
self.ifname = ifname
self.group_ifname = None
self.ctrl = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname))
self.mon = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname))
self.mon.attach()
def request(self, cmd):
logger.debug(self.ifname + ": CTRL: " + cmd)
return self.ctrl.request(cmd)
def group_request(self, cmd):
if self.group_ifname and self.group_ifname != self.ifname:
logger.debug(self.group_ifname + ": CTRL: " + cmd)
gctrl = wpaspy.Ctrl(os.path.join(wpas_ctrl, self.group_ifname))
return gctrl.request(cmd)
return self.request(cmd)
def ping(self):
return "PONG" in self.request("PING")
def reset(self):
self.request("P2P_STOP_FIND")
self.request("P2P_FLUSH")
self.request("P2P_GROUP_REMOVE *")
self.request("REMOVE_NETWORK *")
self.request("REMOVE_CRED *")
self.request("SET tdls_disabled 0")
self.request("SET tdls_testing 0")
self.request("SET ignore_old_scan_res 0")
self.group_ifname = None
def add_network(self):
id = self.request("ADD_NETWORK")
if "FAIL" in id:
raise Exception("ADD_NETWORK failed")
return int(id)
def remove_network(self, id):
id = self.request("REMOVE_NETWORK " + str(id))
if "FAIL" in id:
raise Exception("REMOVE_NETWORK failed")
return None
def set_network(self, id, field, value):
res = self.request("SET_NETWORK " + str(id) + " " + field + " " + value)
if "FAIL" in res:
raise Exception("SET_NETWORK failed")
return None
def set_network_quoted(self, id, field, value):
res = self.request("SET_NETWORK " + str(id) + " " + field + ' "' + value + '"')
if "FAIL" in res:
raise Exception("SET_NETWORK failed")
return None
def select_network(self, id):
id = self.request("SELECT_NETWORK " + str(id))
if "FAIL" in id:
raise Exception("SELECT_NETWORK failed")
return None
def connect_network(self, id):
self.dump_monitor()
self.select_network(id)
ev = self.wait_event(["CTRL-EVENT-CONNECTED"], timeout=10)
if ev is None:
raise Exception("Association with the AP timed out")
self.dump_monitor()
def get_status(self):
res = self.request("STATUS")
lines = res.splitlines()
vals = dict()
for l in lines:
[name,value] = l.split('=', 1)
vals[name] = value
return vals
def get_status_field(self, field):
vals = self.get_status()
if field in vals:
return vals[field]
return None
def get_group_status(self):
res = self.group_request("STATUS")
lines = res.splitlines()
vals = dict()
for l in lines:
[name,value] = l.split('=', 1)
vals[name] = value
return vals
def get_group_status_field(self, field):
vals = self.get_group_status()
if field in vals:
return vals[field]
return None
def p2p_dev_addr(self):
return self.get_status_field("p2p_device_address")
def p2p_interface_addr(self):
return self.get_group_status_field("address")
def p2p_listen(self):
return self.request("P2P_LISTEN")
def p2p_find(self, social=False):
if social:
return self.request("P2P_FIND type=social")
return self.request("P2P_FIND")
def p2p_stop_find(self):
return self.request("P2P_STOP_FIND")
def wps_read_pin(self):
#TODO: make this random
self.pin = "12345670"
return self.pin
def peer_known(self, peer, full=True):
res = self.request("P2P_PEER " + peer)
if peer.lower() not in res.lower():
return False
if not full:
return True
return "[PROBE_REQ_ONLY]" not in res
def discover_peer(self, peer, full=True, timeout=15, social=True):
logger.info(self.ifname + ": Trying to discover peer " + peer)
if self.peer_known(peer, full):
return True
self.p2p_find(social)
count = 0
while count < timeout:
time.sleep(1)
count = count + 1
if self.peer_known(peer, full):
return True
return False
def group_form_result(self, ev, expect_failure=False):
if expect_failure:
if "P2P-GROUP-STARTED" in ev:
raise Exception("Group formation succeeded when expecting failure")
exp = r'<.>(P2P-GO-NEG-FAILURE) status=([0-9]*)'
s = re.split(exp, ev)
if len(s) < 3:
return None
res = {}
res['result'] = 'go-neg-failed'
res['status'] = int(s[2])
return res
if "P2P-GROUP-STARTED" not in ev:
raise Exception("No P2P-GROUP-STARTED event seen")
exp = r'<.>(P2P-GROUP-STARTED) ([^ ]*) ([^ ]*) ssid="(.*)" freq=([0-9]*) ((?:psk=.*)|(?:passphrase=".*")) go_dev_addr=([0-9a-f:]*)'
s = re.split(exp, ev)
if len(s) < 8:
raise Exception("Could not parse P2P-GROUP-STARTED")
res = {}
res['result'] = 'success'
res['ifname'] = s[2]
self.group_ifname = s[2]
res['role'] = s[3]
res['ssid'] = s[4]
res['freq'] = s[5]
p = re.match(r'psk=([0-9a-f]*)', s[6])
if p:
res['psk'] = p.group(1)
p = re.match(r'passphrase="(.*)"', s[6])
if p:
res['passphrase'] = p.group(1)
res['go_dev_addr'] = s[7]
return res
def p2p_go_neg_auth(self, peer, pin, method, go_intent=None):
if not self.discover_peer(peer):
raise Exception("Peer " + peer + " not found")
self.dump_monitor()
cmd = "P2P_CONNECT " + peer + " " + pin + " " + method + " auth"
if go_intent:
cmd = cmd + ' go_intent=' + str(go_intent)
if "OK" in self.request(cmd):
return None
raise Exception("P2P_CONNECT (auth) failed")
def p2p_go_neg_auth_result(self, timeout=1, expect_failure=False):
ev = self.wait_event(["P2P-GROUP-STARTED","P2P-GO-NEG-FAILURE"], timeout);
if ev is None:
if expect_failure:
return None
raise Exception("Group formation timed out")
self.dump_monitor()
return self.group_form_result(ev, expect_failure)
def p2p_go_neg_init(self, peer, pin, method, timeout=0, go_intent=None, expect_failure=False):
if not self.discover_peer(peer):
raise Exception("Peer " + peer + " not found")
self.dump_monitor()
if pin:
cmd = "P2P_CONNECT " + peer + " " + pin + " " + method
else:
cmd = "P2P_CONNECT " + peer + " " + method
if go_intent:
cmd = cmd + ' go_intent=' + str(go_intent)
if "OK" in self.request(cmd):
if timeout == 0:
self.dump_monitor()
return None
ev = self.wait_event(["P2P-GROUP-STARTED","P2P-GO-NEG-FAILURE"], timeout)
if ev is None:
if expect_failure:
return None
raise Exception("Group formation timed out")
self.dump_monitor()
return self.group_form_result(ev, expect_failure)
raise Exception("P2P_CONNECT failed")
def wait_event(self, events, timeout):
count = 0
while count < timeout * 10:
count = count + 1
time.sleep(0.1)
while self.mon.pending():
ev = self.mon.recv()
logger.debug(self.ifname + ": " + ev)
for event in events:
if event in ev:
return ev
return None
def dump_monitor(self):
while self.mon.pending():
ev = self.mon.recv()
logger.debug(self.ifname + ": " + ev)
def remove_group(self, ifname=None):
if ifname is None:
ifname = self.group_ifname if self.group_ifname else self.iname
if "OK" not in self.request("P2P_GROUP_REMOVE " + ifname):
raise Exception("Group could not be removed")
self.group_ifname = None
def p2p_start_go(self, persistent=None, freq=None):
self.dump_monitor()
cmd = "P2P_GROUP_ADD"
if persistent is None:
pass
elif persistent is True:
cmd = cmd + " persistent"
else:
cmd = cmd + " persistent=" + str(persistent)
if freq:
cmd = cmd + " freq=" + freq
if "OK" in self.request(cmd):
ev = self.wait_event(["P2P-GROUP-STARTED"], timeout=5)
if ev is None:
raise Exception("GO start up timed out")
self.dump_monitor()
return self.group_form_result(ev)
raise Exception("P2P_GROUP_ADD failed")
def p2p_go_authorize_client(self, pin):
cmd = "WPS_PIN any " + pin
if "FAIL" in self.group_request(cmd):
raise Exception("Failed to authorize client connection on GO")
return None
def p2p_connect_group(self, go_addr, pin, timeout=0):
self.dump_monitor()
if not self.discover_peer(go_addr, social=False):
raise Exception("GO " + go_addr + " not found")
self.dump_monitor()
cmd = "P2P_CONNECT " + go_addr + " " + pin + " join"
if "OK" in self.request(cmd):
if timeout == 0:
self.dump_monitor()
return None
ev = self.wait_event(["P2P-GROUP-STARTED"], timeout)
if ev is None:
raise Exception("Joining the group timed out")
self.dump_monitor()
return self.group_form_result(ev)
raise Exception("P2P_CONNECT(join) failed")
def tdls_setup(self, peer):
cmd = "TDLS_SETUP " + peer
if "FAIL" in self.group_request(cmd):
raise Exception("Failed to request TDLS setup")
return None
def tdls_teardown(self, peer):
cmd = "TDLS_TEARDOWN " + peer
if "FAIL" in self.group_request(cmd):
raise Exception("Failed to request TDLS teardown")
return None
def connect(self, ssid, psk=None, proto=None, key_mgmt=None, wep_key0=None, ieee80211w=None):
logger.info("Connect STA " + self.ifname + " to AP")
id = self.add_network()
self.set_network_quoted(id, "ssid", ssid)
if psk:
self.set_network_quoted(id, "psk", psk)
if proto:
self.set_network(id, "proto", proto)
if key_mgmt:
self.set_network(id, "key_mgmt", key_mgmt)
if ieee80211w:
self.set_network(id, "ieee80211w", ieee80211w)
if wep_key0:
self.set_network(id, "wep_key0", wep_key0)
self.connect_network(id)
def scan(self, type=None):
if type:
cmd = "SCAN TYPE=" + type
else:
cmd = "SCAN"
self.dump_monitor()
if not "OK" in self.request(cmd):
raise Exception("Failed to trigger scan")
ev = self.wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
if ev is None:
raise Exception("Scan timed out")
def roam(self, bssid):
self.dump_monitor()
self.request("ROAM " + bssid)
ev = self.wait_event(["CTRL-EVENT-CONNECTED"], timeout=10)
if ev is None:
raise Exception("Roaming with the AP timed out")
self.dump_monitor()