# hwsim testing utilities # Copyright (c) 2013-2014, Jouni Malinen # # This software may be distributed under the terms of the BSD license. # See README for more details. import os import subprocess import time import logging logger = logging.getLogger() from wpasupplicant import WpaSupplicant def run_connectivity_test(dev1, dev2, tos, dev1group=False, dev2group=False, ifname1=None, ifname2=None, config=True, timeout=5): addr1 = dev1.own_addr() if not dev1group and isinstance(dev1, WpaSupplicant): addr1 = dev1.get_driver_status_field('addr') addr2 = dev2.own_addr() if not dev2group and isinstance(dev2, WpaSupplicant): addr2 = dev2.get_driver_status_field('addr') dev1.dump_monitor() dev2.dump_monitor() try: if config: cmd = "DATA_TEST_CONFIG 1" if ifname1: cmd = cmd + " ifname=" + ifname1 if dev1group: res = dev1.group_request(cmd) else: res = dev1.request(cmd) if "OK" not in res: raise Exception("Failed to enable data test functionality") cmd = "DATA_TEST_CONFIG 1" if ifname2: cmd = cmd + " ifname=" + ifname2 if dev2group: res = dev2.group_request(cmd) else: res = dev2.request(cmd) if "OK" not in res: raise Exception("Failed to enable data test functionality") cmd = "DATA_TEST_TX {} {} {}".format(addr2, addr1, tos) if dev1group: dev1.group_request(cmd) else: dev1.request(cmd) if dev2group: ev = dev2.wait_group_event(["DATA-TEST-RX"], timeout=timeout) else: ev = dev2.wait_event(["DATA-TEST-RX"], timeout=timeout) if ev is None: raise Exception("dev1->dev2 unicast data delivery failed") if "DATA-TEST-RX {} {}".format(addr2, addr1) not in ev: raise Exception("Unexpected dev1->dev2 unicast data result") cmd = "DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {}".format(addr1, tos) if dev1group: dev1.group_request(cmd) else: dev1.request(cmd) if dev2group: ev = dev2.wait_group_event(["DATA-TEST-RX"], timeout=timeout) else: ev = dev2.wait_event(["DATA-TEST-RX"], timeout=timeout) if ev is None: raise Exception("dev1->dev2 broadcast data delivery failed") if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr1) not in ev: raise Exception("Unexpected dev1->dev2 broadcast data result") cmd = "DATA_TEST_TX {} {} {}".format(addr1, addr2, tos) if dev2group: dev2.group_request(cmd) else: dev2.request(cmd) if dev1group: ev = dev1.wait_group_event(["DATA-TEST-RX"], timeout=timeout) else: ev = dev1.wait_event(["DATA-TEST-RX"], timeout=timeout) if ev is None: raise Exception("dev2->dev1 unicast data delivery failed") if "DATA-TEST-RX {} {}".format(addr1, addr2) not in ev: raise Exception("Unexpected dev2->dev1 unicast data result") cmd = "DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {}".format(addr2, tos) if dev2group: dev2.group_request(cmd) else: dev2.request(cmd) if dev1group: ev = dev1.wait_group_event(["DATA-TEST-RX"], timeout=timeout) else: ev = dev1.wait_event(["DATA-TEST-RX"], timeout=timeout) if ev is None: raise Exception("dev2->dev1 broadcast data delivery failed") if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr2) not in ev: raise Exception("Unexpected dev2->dev1 broadcast data result") finally: if config: if dev1group: dev1.group_request("DATA_TEST_CONFIG 0") else: dev1.request("DATA_TEST_CONFIG 0") if dev2group: dev2.group_request("DATA_TEST_CONFIG 0") else: dev2.request("DATA_TEST_CONFIG 0") def test_connectivity(dev1, dev2, dscp=None, tos=None, max_tries=1, dev1group=False, dev2group=False, ifname1=None, ifname2=None, config=True, timeout=5): if dscp: tos = dscp << 2 if not tos: tos = 0 success = False last_err = None for i in range(0, max_tries): try: run_connectivity_test(dev1, dev2, tos, dev1group, dev2group, ifname1, ifname2, config=config, timeout=timeout) success = True break except Exception, e: last_err = e if i + 1 < max_tries: time.sleep(1) if not success: raise Exception(last_err) def test_connectivity_iface(dev1, dev2, ifname, dscp=None, tos=None, max_tries=1): test_connectivity(dev1, dev2, dscp, tos, ifname2=ifname, max_tries=max_tries) def test_connectivity_p2p(dev1, dev2, dscp=None, tos=None): test_connectivity(dev1, dev2, dscp, tos, dev1group=True, dev2group=True) def test_connectivity_p2p_sta(dev1, dev2, dscp=None, tos=None): test_connectivity(dev1, dev2, dscp, tos, dev1group=True, dev2group=False) def test_connectivity_sta(dev1, dev2, dscp=None, tos=None): test_connectivity(dev1, dev2, dscp, tos) (PS_DISABLED, PS_ENABLED, PS_AUTO_POLL, PS_MANUAL_POLL) = range(4) def set_powersave(dev, val): phy = dev.get_driver_status_field("phyname") psf = open('/sys/kernel/debug/ieee80211/%s/hwsim/ps' % phy, 'w') psf.write('%d\n' % val) psf.close()