diff --git a/tests/hwsim/hwsim_utils.py b/tests/hwsim/hwsim_utils.py index 42856fe9e..621143b05 100644 --- a/tests/hwsim/hwsim_utils.py +++ b/tests/hwsim/hwsim_utils.py @@ -11,7 +11,7 @@ import subprocess import logging logger = logging.getLogger() -def test_connectivity(ifname1, ifname2): +def test_connectivity(ifname1, ifname2, dscp=None, tos=None): if os.path.isfile("../../mac80211_hwsim/tools/hwsim_test"): hwsim_test = "../../mac80211_hwsim/tools/hwsim_test" else: @@ -20,6 +20,12 @@ def test_connectivity(ifname1, ifname2): hwsim_test, ifname1, ifname2] + if dscp: + cmd.append('-D') + cmd.append(str(dscp)) + elif tos: + cmd.append('-t') + cmd.append(str(tos)) try: s = subprocess.check_output(cmd) logger.debug(s) @@ -28,17 +34,17 @@ def test_connectivity(ifname1, ifname2): logger.info(e.output) raise -def test_connectivity_p2p(dev1, dev2): +def test_connectivity_p2p(dev1, dev2, dscp=None, tos=None): ifname1 = dev1.group_ifname if dev1.group_ifname else dev1.ifname ifname2 = dev2.group_ifname if dev2.group_ifname else dev2.ifname - test_connectivity(ifname1, ifname2) + test_connectivity(ifname1, ifname2, dscp, tos) -def test_connectivity_p2p_sta(dev1, dev2): +def test_connectivity_p2p_sta(dev1, dev2, dscp=None, tos=None): ifname1 = dev1.group_ifname if dev1.group_ifname else dev1.ifname ifname2 = dev2.ifname - test_connectivity(ifname1, ifname2) + test_connectivity(ifname1, ifname2, dscp, tos) -def test_connectivity_sta(dev1, dev2): +def test_connectivity_sta(dev1, dev2, dscp=None, tos=None): ifname1 = dev1.ifname ifname2 = dev2.ifname - test_connectivity(ifname1, ifname2) + test_connectivity(ifname1, ifname2, dscp, tos) diff --git a/tests/hwsim/test_ap_qosmap.py b/tests/hwsim/test_ap_qosmap.py index 6d0ecf23b..c67f6ad10 100644 --- a/tests/hwsim/test_ap_qosmap.py +++ b/tests/hwsim/test_ap_qosmap.py @@ -13,6 +13,21 @@ logger = logging.getLogger() import hwsim_utils import hostapd +from wlantest import Wlantest + +def check_qos_map(ap, dev, dscp, tid): + bssid = ap['bssid'] + sta = dev.p2p_interface_addr() + wt = Wlantest() + wt.clear_sta_counters(bssid, sta) + hwsim_utils.test_connectivity(dev.ifname, ap['ifname'], dscp=dscp) + [ tx, rx ] = wt.get_tid_counters(bssid, sta) + if tx[tid] == 0: + logger.info("Expected TX DSCP " + str(dscp) + " with TID " + str(tid) + " but counters: " + str(tx)) + raise Exception("No STA->AP data frame using the expected TID") + if rx[tid] == 0: + logger.info("Expected RX DSCP " + str(dscp) + " with TID " + str(tid) + " but counters: " + str(rx)) + raise Exception("No AP->STA data frame using the expected TID") def test_ap_qosmap(dev, apdev): """QoS mapping""" @@ -21,11 +36,38 @@ def test_ap_qosmap(dev, apdev): return "skip" ssid = "test-qosmap" params = { "ssid": ssid } - params['qos_map_set'] = '53,2,22,6,8,15,0,7,255,255,16,31,32,39,255,255,40,47,255,255' + params['qos_map_set'] = '53,2,22,6,8,15,0,7,255,255,16,31,32,39,255,255,40,47,48,55' hostapd.add_ap(apdev[0]['ifname'], params) dev[0].connect(ssid, key_mgmt="NONE", scan_freq="2412") - hwsim_utils.test_connectivity(dev[0].ifname, apdev[0]['ifname']) + check_qos_map(apdev[0], dev[0], 53, 2) + check_qos_map(apdev[0], dev[0], 22, 6) + check_qos_map(apdev[0], dev[0], 8, 0) + check_qos_map(apdev[0], dev[0], 15, 0) + check_qos_map(apdev[0], dev[0], 0, 1) + check_qos_map(apdev[0], dev[0], 7, 1) + check_qos_map(apdev[0], dev[0], 16, 3) + check_qos_map(apdev[0], dev[0], 31, 3) + check_qos_map(apdev[0], dev[0], 32, 4) + check_qos_map(apdev[0], dev[0], 39, 4) + check_qos_map(apdev[0], dev[0], 40, 6) + check_qos_map(apdev[0], dev[0], 47, 6) + check_qos_map(apdev[0], dev[0], 48, 7) + check_qos_map(apdev[0], dev[0], 55, 7) hapd = hostapd.Hostapd(apdev[0]['ifname']) - hapd.request("SET_QOS_MAP_SET 22,6,8,15,0,7,255,255,16,31,32,39,255,255,40,47,255,255") + hapd.request("SET_QOS_MAP_SET 22,6,8,15,0,7,255,255,16,31,32,39,255,255,40,47,48,55") hapd.request("SEND_QOS_MAP_CONF " + dev[0].get_status_field("address")) - hwsim_utils.test_connectivity(dev[0].ifname, apdev[0]['ifname']) + check_qos_map(apdev[0], dev[0], 53, 7) + check_qos_map(apdev[0], dev[0], 22, 6) + check_qos_map(apdev[0], dev[0], 48, 7) + check_qos_map(apdev[0], dev[0], 55, 7) + check_qos_map(apdev[0], dev[0], 56, 56 >> 3) + check_qos_map(apdev[0], dev[0], 63, 63 >> 3) + +def test_ap_qosmap_default(dev, apdev): + """QoS mapping with default values""" + ssid = "test-qosmap-default" + params = { "ssid": ssid } + hostapd.add_ap(apdev[0]['ifname'], params) + dev[0].connect(ssid, key_mgmt="NONE", scan_freq="2412") + for dscp in [ 0, 7, 8, 15, 16, 23, 24, 31, 32, 39, 40, 47, 48, 55, 56, 63]: + check_qos_map(apdev[0], dev[0], dscp, dscp >> 3) diff --git a/tests/hwsim/wlantest.py b/tests/hwsim/wlantest.py index c6dba12a7..57853fb41 100644 --- a/tests/hwsim/wlantest.py +++ b/tests/hwsim/wlantest.py @@ -63,6 +63,12 @@ class Wlantest: raise Exception("wlantest_cli command failed") return int(res) + def clear_sta_counters(self, bssid, addr): + res = subprocess.check_output([self.wlantest_cli, "clear_sta_counters", + bssid, addr]); + if "FAIL" in res: + raise Exception("wlantest_cli command failed") + def tdls_clear(self, bssid, addr1, addr2): res = subprocess.check_output([self.wlantest_cli, "clear_tdls_counters", bssid, addr1, addr2]); @@ -114,3 +120,25 @@ class Wlantest: res = self.info_sta("key_mgmt", bssid, addr) if key_mgmt not in res: raise Exception("Unexpected STA key_mgmt") + + def get_tx_tid(self, bssid, addr, tid): + res = subprocess.check_output([self.wlantest_cli, "get_tx_tid", + bssid, addr, str(tid)]); + if "FAIL" in res: + raise Exception("wlantest_cli command failed") + return int(res) + + def get_rx_tid(self, bssid, addr, tid): + res = subprocess.check_output([self.wlantest_cli, "get_rx_tid", + bssid, addr, str(tid)]); + if "FAIL" in res: + raise Exception("wlantest_cli command failed") + return int(res) + + def get_tid_counters(self, bssid, addr): + tx = {} + rx = {} + for tid in range(0, 17): + tx[tid] = self.get_tx_tid(bssid, addr, tid) + rx[tid] = self.get_rx_tid(bssid, addr, tid) + return [ tx, rx ]