From 4e9bcdebf3caa592b0a53dc563ffa11024ffd545 Mon Sep 17 00:00:00 2001 From: Jouni Malinen Date: Fri, 27 Dec 2019 23:17:43 +0200 Subject: [PATCH] tests: Replace tcpdump with wlantest This removes dependency on tcpdump by using an already included test tool for capturing frames with Ethernet headers. There were some issues in getting tcpdump working on Ubuntu 19.10, so this seems to be a clean way of addressing that. Signed-off-by: Jouni Malinen --- tests/hwsim/test_ap_hs20.py | 37 +++++--------- tests/hwsim/test_ap_open.py | 12 ++--- tests/hwsim/test_ap_psk.py | 11 ++--- tests/hwsim/test_dpp.py | 13 ++--- tests/hwsim/test_macsec.py | 99 +++++++++---------------------------- tests/hwsim/wlantest.py | 34 ++++++++++++- 6 files changed, 79 insertions(+), 127 deletions(-) diff --git a/tests/hwsim/test_ap_hs20.py b/tests/hwsim/test_ap_hs20.py index 1c4c0fa33..c33e0dcae 100644 --- a/tests/hwsim/test_ap_hs20.py +++ b/tests/hwsim/test_ap_hs20.py @@ -22,6 +22,7 @@ import hwsim_utils from tshark import run_tshark from wlantest import Wlantest from wpasupplicant import WpaSupplicant +from wlantest import WlantestCapture from test_ap_eap import check_eap_capa, check_domain_match_full from test_gas import gas_rx, parse_gas, action_response, anqp_initial_resp, send_gas_resp, ACTION_CATEG_PUBLIC, GAS_INITIAL_RESPONSE @@ -4671,18 +4672,10 @@ def _test_proxyarp_open(dev, apdev, params, ebtables=False): time.sleep(0.5) cmd = {} - cmd[0] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'ap-br0', - '-w', cap_br, '-s', '2000'], - stderr=open('/dev/null', 'w')) - cmd[1] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', dev[0].ifname, - '-w', cap_dev0, '-s', '2000'], - stderr=open('/dev/null', 'w')) - cmd[2] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', dev[1].ifname, - '-w', cap_dev1, '-s', '2000'], - stderr=open('/dev/null', 'w')) - cmd[3] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', dev[2].ifname, - '-w', cap_dev2, '-s', '2000'], - stderr=open('/dev/null', 'w')) + cmd[0] = WlantestCapture('ap-br0', cap_br) + cmd[1] = WlantestCapture(dev[0].ifname, cap_dev0) + cmd[2] = WlantestCapture(dev[1].ifname, cap_dev1) + cmd[3] = WlantestCapture(dev[2].ifname, cap_dev2) dev[0].connect("open", key_mgmt="NONE", scan_freq="2412") dev[1].connect("open", key_mgmt="NONE", scan_freq="2412") @@ -4875,7 +4868,7 @@ def _test_proxyarp_open(dev, apdev, params, ebtables=False): dev[1].request("DISCONNECT") time.sleep(1.5) for i in range(len(cmd)): - cmd[i].terminate() + cmd[i].close() time.sleep(0.1) macs = get_bridge_macs("ap-br0") logger.info("After disconnect (showmacs): " + str(macs)) @@ -5019,18 +5012,10 @@ def _test_proxyarp_open_ipv6(dev, apdev, params, ebtables=False): time.sleep(0.5) cmd = {} - cmd[0] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'ap-br0', - '-w', cap_br, '-s', '2000'], - stderr=open('/dev/null', 'w')) - cmd[1] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', dev[0].ifname, - '-w', cap_dev0, '-s', '2000'], - stderr=open('/dev/null', 'w')) - cmd[2] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', dev[1].ifname, - '-w', cap_dev1, '-s', '2000'], - stderr=open('/dev/null', 'w')) - cmd[3] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', dev[2].ifname, - '-w', cap_dev2, '-s', '2000'], - stderr=open('/dev/null', 'w')) + cmd[0] = WlantestCapture('ap-br0', cap_br) + cmd[1] = WlantestCapture(dev[0].ifname, cap_dev0) + cmd[2] = WlantestCapture(dev[1].ifname, cap_dev1) + cmd[3] = WlantestCapture(dev[2].ifname, cap_dev2) dev[0].connect("open", key_mgmt="NONE", scan_freq="2412") dev[1].connect("open", key_mgmt="NONE", scan_freq="2412") @@ -5134,7 +5119,7 @@ def _test_proxyarp_open_ipv6(dev, apdev, params, ebtables=False): dev[1].request("DISCONNECT") time.sleep(0.5) for i in range(len(cmd)): - cmd[i].terminate() + cmd[i].close() macs = get_bridge_macs("ap-br0") logger.info("After disconnect (showmacs): " + str(macs)) matches = get_permanent_neighbors("ap-br0") diff --git a/tests/hwsim/test_ap_open.py b/tests/hwsim/test_ap_open.py index dd479d833..0d83438c4 100644 --- a/tests/hwsim/test_ap_open.py +++ b/tests/hwsim/test_ap_open.py @@ -17,6 +17,7 @@ import hwsim_utils from tshark import run_tshark from utils import * from wpasupplicant import WpaSupplicant +from wlantest import WlantestCapture from test_ap_ht import set_world_reg @remote_compatible @@ -911,10 +912,7 @@ def test_ap_open_layer_2_update(dev, apdev, params): cap = os.path.join(params['logdir'], prefix + "." + ifname + ".pcap") hapd = hostapd.add_ap(apdev[0], {"ssid": "open"}) - capture = subprocess.Popen(['tcpdump', '-p', '-U', '-i', ifname, - '-w', cap, '-s', '2000'], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) + wt = WlantestCapture(ifname, cap) time.sleep(1) dev[0].connect("open", key_mgmt="NONE", scan_freq="2412") @@ -923,11 +921,7 @@ def test_ap_open_layer_2_update(dev, apdev, params): time.sleep(1) hwsim_utils.test_connectivity(dev[0], hapd) time.sleep(0.5) - capture.terminate() - res = capture.communicate() - logger.info("tcpdump stdout: " + res[0].decode()) - logger.info("tcpdump stderr: " + res[1].decode()) - time.sleep(0.5) + wt.close() # Check for Layer 2 Update frame and unexpected frames from the station # that did not fully complete authentication. diff --git a/tests/hwsim/test_ap_psk.py b/tests/hwsim/test_ap_psk.py index d32b11cbf..472264c69 100644 --- a/tests/hwsim/test_ap_psk.py +++ b/tests/hwsim/test_ap_psk.py @@ -23,6 +23,7 @@ from utils import HwsimSkip, fail_test, skip_with_fips, start_monitor, stop_moni import hwsim_utils from wpasupplicant import WpaSupplicant from tshark import run_tshark +from wlantest import WlantestCapture def check_mib(dev, vals): mib = dev.get_mib() @@ -3219,10 +3220,7 @@ def test_ap_wpa2_psk_inject_assoc(dev, apdev, params): params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678") params["wpa_key_mgmt"] = "WPA-PSK" hapd = hostapd.add_ap(apdev[0], params) - capture = subprocess.Popen(['tcpdump', '-p', '-U', '-i', ifname, - '-w', cap, '-s', '2000'], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) + wt = WlantestCapture(ifname, cap) time.sleep(1) bssid = hapd.own_addr().replace(':', '') @@ -3260,10 +3258,7 @@ def test_ap_wpa2_psk_inject_assoc(dev, apdev, params): time.sleep(1) hwsim_utils.test_connectivity(dev[0], hapd) time.sleep(0.5) - capture.terminate() - res = capture.communicate() - logger.info("tcpdump stdout: " + res[0].decode()) - logger.info("tcpdump stderr: " + res[1].decode()) + wt.close() time.sleep(0.5) # Check for Layer 2 Update frame and unexpected frames from the station diff --git a/tests/hwsim/test_dpp.py b/tests/hwsim/test_dpp.py index 2e87c1564..7527b6c14 100644 --- a/tests/hwsim/test_dpp.py +++ b/tests/hwsim/test_dpp.py @@ -21,6 +21,7 @@ import hwsim_utils from hwsim import HWSimRadio from utils import HwsimSkip, alloc_fail, fail_test, wait_fail_trigger from wpasupplicant import WpaSupplicant +from wlantest import WlantestCapture try: import OpenSSL @@ -4480,9 +4481,7 @@ def run_dpp_controller_relay(dev, apdev, params): prefix = "dpp_controller_relay" cap_lo = os.path.join(params['logdir'], prefix + ".lo.pcap") - cmd = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'lo', - '-w', cap_lo, '-s', '2000'], - stderr=open('/dev/null', 'w')) + wt = WlantestCapture('lo', cap_lo) # Controller conf_id = dev[1].dpp_configurator_add() @@ -4526,7 +4525,7 @@ def run_dpp_controller_relay(dev, apdev, params): dev[0].wait_connected() time.sleep(0.5) - cmd.terminate() + wt.close() def test_dpp_tcp(dev, apdev, params): """DPP over TCP""" @@ -4550,9 +4549,7 @@ def run_dpp_tcp(dev, apdev, cap_lo, port=None): check_dpp_capab(dev[0]) check_dpp_capab(dev[1]) - cmd = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'lo', - '-w', cap_lo, '-s', '2000'], - stderr=open('/dev/null', 'w')) + wt = WlantestCapture('lo', cap_lo) time.sleep(1) # Controller @@ -4583,7 +4580,7 @@ def run_dpp_tcp(dev, apdev, cap_lo, port=None): allow_enrollee_failure=True, allow_configurator_failure=True) time.sleep(0.5) - cmd.terminate() + wt.close() def test_dpp_tcp_controller_start_failure(dev, apdev, params): """DPP Controller startup failure""" diff --git a/tests/hwsim/test_macsec.py b/tests/hwsim/test_macsec.py index 8ef4fec61..e521c6b3d 100644 --- a/tests/hwsim/test_macsec.py +++ b/tests/hwsim/test_macsec.py @@ -16,6 +16,7 @@ import hostapd from wpasupplicant import WpaSupplicant import hwsim_utils from utils import HwsimSkip, alloc_fail, fail_test, wait_fail_trigger +from wlantest import WlantestCapture def cleanup_macsec(): wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5', monitor=False) @@ -275,14 +276,8 @@ def run_macsec_psk(dev, apdev, params, prefix, integ_only=False, port0=None, subprocess.check_call(["ip", "link", "set", "dev", "veth%d" % i, "up"]) cmd = {} - cmd[0] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'veth0', - '-w', cap_veth0, '-s', '2000', - '--immediate-mode'], - stderr=open('/dev/null', 'w')) - cmd[1] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'veth1', - '-w', cap_veth1, '-s', '2000', - '--immediate-mode'], - stderr=open('/dev/null', 'w')) + cmd[0] = WlantestCapture('veth0', cap_veth0) + cmd[1] = WlantestCapture('veth1', cap_veth1) wpa = add_wpas_interfaces() wpas0 = wpa[0] @@ -307,17 +302,11 @@ def run_macsec_psk(dev, apdev, params, prefix, integ_only=False, port0=None, if expect_failure: for i in range(len(cmd)): - cmd[i].terminate() + cmd[i].close() return - cmd[2] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', macsec_ifname0, - '-w', cap_macsec0, '-s', '2000', - '--immediate-mode'], - stderr=open('/dev/null', 'w')) - cmd[3] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', macsec_ifname1, - '-w', cap_macsec1, '-s', '2000', - '--immediate-mode'], - stderr=open('/dev/null', 'w')) + cmd[2] = WlantestCapture(macsec_ifname0, cap_macsec0) + cmd[3] = WlantestCapture(macsec_ifname1, cap_macsec1) time.sleep(0.5) mi0 = wpas0.get_status_field("mi") @@ -353,7 +342,7 @@ def run_macsec_psk(dev, apdev, params, prefix, integ_only=False, port0=None, time.sleep(1) for i in range(len(cmd)): - cmd[i].terminate() + cmd[i].close() def cleanup_macsec_br(count): wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5', monitor=False) @@ -591,16 +580,8 @@ def run_macsec_psk_ns(dev, apdev, params): "up"]) cmd = {} - cmd[0] = subprocess.Popen(['ip', 'netns', 'exec', 'ns0', - 'tcpdump', '-p', '-U', '-i', 'veth0', - '-w', cap_veth0, '-s', '2000', - '--immediate-mode'], - stderr=open('/dev/null', 'w')) - cmd[1] = subprocess.Popen(['ip', 'netns', 'exec', 'ns1', - 'tcpdump', '-p', '-U', '-i', 'veth1', - '-w', cap_veth1, '-s', '2000', - '--immediate-mode'], - stderr=open('/dev/null', 'w')) + cmd[0] = WlantestCapture('veth0', cap_veth0, netns='ns0') + cmd[1] = WlantestCapture('veth1', cap_veth1, netns='ns1') write_conf(conffile + '0') write_conf(conffile + '1', mka_priority=100) @@ -661,16 +642,8 @@ def run_macsec_psk_ns(dev, apdev, params): break time.sleep(1) - cmd[2] = subprocess.Popen(['ip', 'netns', 'exec', 'ns0', - 'tcpdump', '-p', '-U', '-i', macsec_ifname0, - '-w', cap_macsec0, '-s', '2000', - '--immediate-mode'], - stderr=open('/dev/null', 'w')) - cmd[3] = subprocess.Popen(['ip', 'netns', 'exec', 'ns0', - 'tcpdump', '-p', '-U', '-i', macsec_ifname1, - '-w', cap_macsec1, '-s', '2000', - '--immediate-mode'], - stderr=open('/dev/null', 'w')) + cmd[2] = WlantestCapture(macsec_ifname0, cap_macsec0, netns='ns0') + cmd[3] = WlantestCapture(macsec_ifname1, cap_macsec1, netns='ns0') time.sleep(0.5) logger.info("wpas0 STATUS:\n" + wpas0.request("STATUS")) @@ -708,7 +681,7 @@ def run_macsec_psk_ns(dev, apdev, params): time.sleep(1) for i in range(len(cmd)): - cmd[i].terminate() + cmd[i].close() def test_macsec_psk_fail_cp(dev, apdev): """MACsec PSK local failures in CP state machine""" @@ -769,14 +742,8 @@ def run_macsec_hostapd_psk(dev, apdev, params, prefix, integ_only=False, subprocess.check_call(["ip", "link", "set", "dev", "veth%d" % i, "up"]) cmd = {} - cmd[0] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'veth0', - '-w', cap_veth0, '-s', '2000', - '--immediate-mode'], - stderr=open('/dev/null', 'w')) - cmd[1] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'veth1', - '-w', cap_veth1, '-s', '2000', - '--immediate-mode'], - stderr=open('/dev/null', 'w')) + cmd[0] = WlantestCapture('veth0', cap_veth0) + cmd[1] = WlantestCapture('veth1', cap_veth1) wpa = add_wpas_interfaces(count=1) wpas0 = wpa[0] @@ -816,20 +783,14 @@ def run_macsec_hostapd_psk(dev, apdev, params, prefix, integ_only=False, if expect_failure: for i in range(len(cmd)): - cmd[i].terminate() + cmd[i].close() return macsec_ifname0 = wpas0.get_driver_status_field("parent_ifname") macsec_ifname1 = hapd.get_driver_status_field("parent_ifname") - cmd[2] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', macsec_ifname0, - '-w', cap_macsec0, '-s', '2000', - '--immediate-mode'], - stderr=open('/dev/null', 'w')) - cmd[3] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', macsec_ifname1, - '-w', cap_macsec1, '-s', '2000', - '--immediate-mode'], - stderr=open('/dev/null', 'w')) + cmd[2] = WlantestCapture(macsec_ifname0, cap_macsec0) + cmd[3] = WlantestCapture(macsec_ifname1, cap_macsec1) time.sleep(0.5) logger.info("wpas0 MIB:\n" + wpas0.request("MIB")) @@ -843,7 +804,7 @@ def run_macsec_hostapd_psk(dev, apdev, params, prefix, integ_only=False, time.sleep(1) for i in range(len(cmd)): - cmd[i].terminate() + cmd[i].close() def test_macsec_hostapd_eap(dev, apdev, params): """MACsec EAP with hostapd""" @@ -865,14 +826,8 @@ def run_macsec_hostapd_eap(dev, apdev, params, prefix, integ_only=False, subprocess.check_call(["ip", "link", "set", "dev", "veth%d" % i, "up"]) cmd = {} - cmd[0] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'veth0', - '-w', cap_veth0, '-s', '2000', - '--immediate-mode'], - stderr=open('/dev/null', 'w')) - cmd[1] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'veth1', - '-w', cap_veth1, '-s', '2000', - '--immediate-mode'], - stderr=open('/dev/null', 'w')) + cmd[0] = WlantestCapture('veth0', cap_veth0) + cmd[1] = WlantestCapture('veth1', cap_veth1) wpa = add_wpas_interfaces(count=1) wpas0 = wpa[0] @@ -911,20 +866,14 @@ def run_macsec_hostapd_eap(dev, apdev, params, prefix, integ_only=False, if expect_failure: for i in range(len(cmd)): - cmd[i].terminate() + cmd[i].close() return macsec_ifname0 = wpas0.get_driver_status_field("parent_ifname") macsec_ifname1 = hapd.get_driver_status_field("parent_ifname") - cmd[2] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', macsec_ifname0, - '-w', cap_macsec0, '-s', '2000', - '--immediate-mode'], - stderr=open('/dev/null', 'w')) - cmd[3] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', macsec_ifname1, - '-w', cap_macsec1, '-s', '2000', - '--immediate-mode'], - stderr=open('/dev/null', 'w')) + cmd[2] = WlantestCapture(macsec_ifname0, cap_macsec0) + cmd[3] = WlantestCapture(macsec_ifname1, cap_macsec1) time.sleep(0.5) logger.info("wpas0 MIB:\n" + wpas0.request("MIB")) @@ -938,4 +887,4 @@ def run_macsec_hostapd_eap(dev, apdev, params, prefix, integ_only=False, time.sleep(1) for i in range(len(cmd)): - cmd[i].terminate() + cmd[i].close() diff --git a/tests/hwsim/wlantest.py b/tests/hwsim/wlantest.py index 5bb268f85..b0e817913 100644 --- a/tests/hwsim/wlantest.py +++ b/tests/hwsim/wlantest.py @@ -1,5 +1,5 @@ # Python class for controlling wlantest -# Copyright (c) 2013-2014, Jouni Malinen +# Copyright (c) 2013-2019, Jouni Malinen # # This software may be distributed under the terms of the BSD license. # See README for more details. @@ -242,3 +242,35 @@ class Wlantest: tx[tid] = self.get_tx_tid(bssid, addr, tid) rx[tid] = self.get_rx_tid(bssid, addr, tid) return [tx, rx] + +class WlantestCapture: + def __init__(self, ifname, output, netns=None): + self.cmd = None + self.ifname = ifname + if os.path.isfile('../../wlantest/wlantest'): + bin = '../../wlantest/wlantest' + else: + bin = 'wlantest' + logger.debug("wlantest[%s] starting" % ifname) + args = [bin, '-e', '-i', ifname, '-w', output] + if netns: + args = ['ip', 'netns', 'exec', netns] + args + self.cmd = subprocess.Popen(args, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + + def __del__(self): + if self.cmd: + self.close() + + def close(self): + logger.debug("wlantest[%s] stopping" % self.ifname) + self.cmd.terminate() + res = self.cmd.communicate() + if len(res[0]) > 0: + logger.debug("wlantest[%s] stdout: %s" % (self.ifname, + res[0].decode().strip())) + if len(res[1]) > 0: + logger.debug("wlantest[%s] stderr: %s" % (self.ifname, + res[1].decode().strip())) + self.cmd = None