tests: Replace tcpdump with wlantest

This removes dependency on tcpdump by using an already included test
tool for capturing frames with Ethernet headers. There were some issues
in getting tcpdump working on Ubuntu 19.10, so this seems to be a clean
way of addressing that.

Signed-off-by: Jouni Malinen <j@w1.fi>
This commit is contained in:
Jouni Malinen 2019-12-27 23:17:43 +02:00
parent 420989085d
commit 4e9bcdebf3
6 changed files with 79 additions and 127 deletions

View file

@ -22,6 +22,7 @@ import hwsim_utils
from tshark import run_tshark
from wlantest import Wlantest
from wpasupplicant import WpaSupplicant
from wlantest import WlantestCapture
from test_ap_eap import check_eap_capa, check_domain_match_full
from test_gas import gas_rx, parse_gas, action_response, anqp_initial_resp, send_gas_resp, ACTION_CATEG_PUBLIC, GAS_INITIAL_RESPONSE
@ -4671,18 +4672,10 @@ def _test_proxyarp_open(dev, apdev, params, ebtables=False):
time.sleep(0.5)
cmd = {}
cmd[0] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'ap-br0',
'-w', cap_br, '-s', '2000'],
stderr=open('/dev/null', 'w'))
cmd[1] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', dev[0].ifname,
'-w', cap_dev0, '-s', '2000'],
stderr=open('/dev/null', 'w'))
cmd[2] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', dev[1].ifname,
'-w', cap_dev1, '-s', '2000'],
stderr=open('/dev/null', 'w'))
cmd[3] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', dev[2].ifname,
'-w', cap_dev2, '-s', '2000'],
stderr=open('/dev/null', 'w'))
cmd[0] = WlantestCapture('ap-br0', cap_br)
cmd[1] = WlantestCapture(dev[0].ifname, cap_dev0)
cmd[2] = WlantestCapture(dev[1].ifname, cap_dev1)
cmd[3] = WlantestCapture(dev[2].ifname, cap_dev2)
dev[0].connect("open", key_mgmt="NONE", scan_freq="2412")
dev[1].connect("open", key_mgmt="NONE", scan_freq="2412")
@ -4875,7 +4868,7 @@ def _test_proxyarp_open(dev, apdev, params, ebtables=False):
dev[1].request("DISCONNECT")
time.sleep(1.5)
for i in range(len(cmd)):
cmd[i].terminate()
cmd[i].close()
time.sleep(0.1)
macs = get_bridge_macs("ap-br0")
logger.info("After disconnect (showmacs): " + str(macs))
@ -5019,18 +5012,10 @@ def _test_proxyarp_open_ipv6(dev, apdev, params, ebtables=False):
time.sleep(0.5)
cmd = {}
cmd[0] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'ap-br0',
'-w', cap_br, '-s', '2000'],
stderr=open('/dev/null', 'w'))
cmd[1] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', dev[0].ifname,
'-w', cap_dev0, '-s', '2000'],
stderr=open('/dev/null', 'w'))
cmd[2] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', dev[1].ifname,
'-w', cap_dev1, '-s', '2000'],
stderr=open('/dev/null', 'w'))
cmd[3] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', dev[2].ifname,
'-w', cap_dev2, '-s', '2000'],
stderr=open('/dev/null', 'w'))
cmd[0] = WlantestCapture('ap-br0', cap_br)
cmd[1] = WlantestCapture(dev[0].ifname, cap_dev0)
cmd[2] = WlantestCapture(dev[1].ifname, cap_dev1)
cmd[3] = WlantestCapture(dev[2].ifname, cap_dev2)
dev[0].connect("open", key_mgmt="NONE", scan_freq="2412")
dev[1].connect("open", key_mgmt="NONE", scan_freq="2412")
@ -5134,7 +5119,7 @@ def _test_proxyarp_open_ipv6(dev, apdev, params, ebtables=False):
dev[1].request("DISCONNECT")
time.sleep(0.5)
for i in range(len(cmd)):
cmd[i].terminate()
cmd[i].close()
macs = get_bridge_macs("ap-br0")
logger.info("After disconnect (showmacs): " + str(macs))
matches = get_permanent_neighbors("ap-br0")

View file

@ -17,6 +17,7 @@ import hwsim_utils
from tshark import run_tshark
from utils import *
from wpasupplicant import WpaSupplicant
from wlantest import WlantestCapture
from test_ap_ht import set_world_reg
@remote_compatible
@ -911,10 +912,7 @@ def test_ap_open_layer_2_update(dev, apdev, params):
cap = os.path.join(params['logdir'], prefix + "." + ifname + ".pcap")
hapd = hostapd.add_ap(apdev[0], {"ssid": "open"})
capture = subprocess.Popen(['tcpdump', '-p', '-U', '-i', ifname,
'-w', cap, '-s', '2000'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
wt = WlantestCapture(ifname, cap)
time.sleep(1)
dev[0].connect("open", key_mgmt="NONE", scan_freq="2412")
@ -923,11 +921,7 @@ def test_ap_open_layer_2_update(dev, apdev, params):
time.sleep(1)
hwsim_utils.test_connectivity(dev[0], hapd)
time.sleep(0.5)
capture.terminate()
res = capture.communicate()
logger.info("tcpdump stdout: " + res[0].decode())
logger.info("tcpdump stderr: " + res[1].decode())
time.sleep(0.5)
wt.close()
# Check for Layer 2 Update frame and unexpected frames from the station
# that did not fully complete authentication.

View file

@ -23,6 +23,7 @@ from utils import HwsimSkip, fail_test, skip_with_fips, start_monitor, stop_moni
import hwsim_utils
from wpasupplicant import WpaSupplicant
from tshark import run_tshark
from wlantest import WlantestCapture
def check_mib(dev, vals):
mib = dev.get_mib()
@ -3219,10 +3220,7 @@ def test_ap_wpa2_psk_inject_assoc(dev, apdev, params):
params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
params["wpa_key_mgmt"] = "WPA-PSK"
hapd = hostapd.add_ap(apdev[0], params)
capture = subprocess.Popen(['tcpdump', '-p', '-U', '-i', ifname,
'-w', cap, '-s', '2000'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
wt = WlantestCapture(ifname, cap)
time.sleep(1)
bssid = hapd.own_addr().replace(':', '')
@ -3260,10 +3258,7 @@ def test_ap_wpa2_psk_inject_assoc(dev, apdev, params):
time.sleep(1)
hwsim_utils.test_connectivity(dev[0], hapd)
time.sleep(0.5)
capture.terminate()
res = capture.communicate()
logger.info("tcpdump stdout: " + res[0].decode())
logger.info("tcpdump stderr: " + res[1].decode())
wt.close()
time.sleep(0.5)
# Check for Layer 2 Update frame and unexpected frames from the station

View file

@ -21,6 +21,7 @@ import hwsim_utils
from hwsim import HWSimRadio
from utils import HwsimSkip, alloc_fail, fail_test, wait_fail_trigger
from wpasupplicant import WpaSupplicant
from wlantest import WlantestCapture
try:
import OpenSSL
@ -4480,9 +4481,7 @@ def run_dpp_controller_relay(dev, apdev, params):
prefix = "dpp_controller_relay"
cap_lo = os.path.join(params['logdir'], prefix + ".lo.pcap")
cmd = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'lo',
'-w', cap_lo, '-s', '2000'],
stderr=open('/dev/null', 'w'))
wt = WlantestCapture('lo', cap_lo)
# Controller
conf_id = dev[1].dpp_configurator_add()
@ -4526,7 +4525,7 @@ def run_dpp_controller_relay(dev, apdev, params):
dev[0].wait_connected()
time.sleep(0.5)
cmd.terminate()
wt.close()
def test_dpp_tcp(dev, apdev, params):
"""DPP over TCP"""
@ -4550,9 +4549,7 @@ def run_dpp_tcp(dev, apdev, cap_lo, port=None):
check_dpp_capab(dev[0])
check_dpp_capab(dev[1])
cmd = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'lo',
'-w', cap_lo, '-s', '2000'],
stderr=open('/dev/null', 'w'))
wt = WlantestCapture('lo', cap_lo)
time.sleep(1)
# Controller
@ -4583,7 +4580,7 @@ def run_dpp_tcp(dev, apdev, cap_lo, port=None):
allow_enrollee_failure=True,
allow_configurator_failure=True)
time.sleep(0.5)
cmd.terminate()
wt.close()
def test_dpp_tcp_controller_start_failure(dev, apdev, params):
"""DPP Controller startup failure"""

View file

@ -16,6 +16,7 @@ import hostapd
from wpasupplicant import WpaSupplicant
import hwsim_utils
from utils import HwsimSkip, alloc_fail, fail_test, wait_fail_trigger
from wlantest import WlantestCapture
def cleanup_macsec():
wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5', monitor=False)
@ -275,14 +276,8 @@ def run_macsec_psk(dev, apdev, params, prefix, integ_only=False, port0=None,
subprocess.check_call(["ip", "link", "set", "dev", "veth%d" % i, "up"])
cmd = {}
cmd[0] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'veth0',
'-w', cap_veth0, '-s', '2000',
'--immediate-mode'],
stderr=open('/dev/null', 'w'))
cmd[1] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'veth1',
'-w', cap_veth1, '-s', '2000',
'--immediate-mode'],
stderr=open('/dev/null', 'w'))
cmd[0] = WlantestCapture('veth0', cap_veth0)
cmd[1] = WlantestCapture('veth1', cap_veth1)
wpa = add_wpas_interfaces()
wpas0 = wpa[0]
@ -307,17 +302,11 @@ def run_macsec_psk(dev, apdev, params, prefix, integ_only=False, port0=None,
if expect_failure:
for i in range(len(cmd)):
cmd[i].terminate()
cmd[i].close()
return
cmd[2] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', macsec_ifname0,
'-w', cap_macsec0, '-s', '2000',
'--immediate-mode'],
stderr=open('/dev/null', 'w'))
cmd[3] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', macsec_ifname1,
'-w', cap_macsec1, '-s', '2000',
'--immediate-mode'],
stderr=open('/dev/null', 'w'))
cmd[2] = WlantestCapture(macsec_ifname0, cap_macsec0)
cmd[3] = WlantestCapture(macsec_ifname1, cap_macsec1)
time.sleep(0.5)
mi0 = wpas0.get_status_field("mi")
@ -353,7 +342,7 @@ def run_macsec_psk(dev, apdev, params, prefix, integ_only=False, port0=None,
time.sleep(1)
for i in range(len(cmd)):
cmd[i].terminate()
cmd[i].close()
def cleanup_macsec_br(count):
wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5', monitor=False)
@ -591,16 +580,8 @@ def run_macsec_psk_ns(dev, apdev, params):
"up"])
cmd = {}
cmd[0] = subprocess.Popen(['ip', 'netns', 'exec', 'ns0',
'tcpdump', '-p', '-U', '-i', 'veth0',
'-w', cap_veth0, '-s', '2000',
'--immediate-mode'],
stderr=open('/dev/null', 'w'))
cmd[1] = subprocess.Popen(['ip', 'netns', 'exec', 'ns1',
'tcpdump', '-p', '-U', '-i', 'veth1',
'-w', cap_veth1, '-s', '2000',
'--immediate-mode'],
stderr=open('/dev/null', 'w'))
cmd[0] = WlantestCapture('veth0', cap_veth0, netns='ns0')
cmd[1] = WlantestCapture('veth1', cap_veth1, netns='ns1')
write_conf(conffile + '0')
write_conf(conffile + '1', mka_priority=100)
@ -661,16 +642,8 @@ def run_macsec_psk_ns(dev, apdev, params):
break
time.sleep(1)
cmd[2] = subprocess.Popen(['ip', 'netns', 'exec', 'ns0',
'tcpdump', '-p', '-U', '-i', macsec_ifname0,
'-w', cap_macsec0, '-s', '2000',
'--immediate-mode'],
stderr=open('/dev/null', 'w'))
cmd[3] = subprocess.Popen(['ip', 'netns', 'exec', 'ns0',
'tcpdump', '-p', '-U', '-i', macsec_ifname1,
'-w', cap_macsec1, '-s', '2000',
'--immediate-mode'],
stderr=open('/dev/null', 'w'))
cmd[2] = WlantestCapture(macsec_ifname0, cap_macsec0, netns='ns0')
cmd[3] = WlantestCapture(macsec_ifname1, cap_macsec1, netns='ns0')
time.sleep(0.5)
logger.info("wpas0 STATUS:\n" + wpas0.request("STATUS"))
@ -708,7 +681,7 @@ def run_macsec_psk_ns(dev, apdev, params):
time.sleep(1)
for i in range(len(cmd)):
cmd[i].terminate()
cmd[i].close()
def test_macsec_psk_fail_cp(dev, apdev):
"""MACsec PSK local failures in CP state machine"""
@ -769,14 +742,8 @@ def run_macsec_hostapd_psk(dev, apdev, params, prefix, integ_only=False,
subprocess.check_call(["ip", "link", "set", "dev", "veth%d" % i, "up"])
cmd = {}
cmd[0] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'veth0',
'-w', cap_veth0, '-s', '2000',
'--immediate-mode'],
stderr=open('/dev/null', 'w'))
cmd[1] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'veth1',
'-w', cap_veth1, '-s', '2000',
'--immediate-mode'],
stderr=open('/dev/null', 'w'))
cmd[0] = WlantestCapture('veth0', cap_veth0)
cmd[1] = WlantestCapture('veth1', cap_veth1)
wpa = add_wpas_interfaces(count=1)
wpas0 = wpa[0]
@ -816,20 +783,14 @@ def run_macsec_hostapd_psk(dev, apdev, params, prefix, integ_only=False,
if expect_failure:
for i in range(len(cmd)):
cmd[i].terminate()
cmd[i].close()
return
macsec_ifname0 = wpas0.get_driver_status_field("parent_ifname")
macsec_ifname1 = hapd.get_driver_status_field("parent_ifname")
cmd[2] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', macsec_ifname0,
'-w', cap_macsec0, '-s', '2000',
'--immediate-mode'],
stderr=open('/dev/null', 'w'))
cmd[3] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', macsec_ifname1,
'-w', cap_macsec1, '-s', '2000',
'--immediate-mode'],
stderr=open('/dev/null', 'w'))
cmd[2] = WlantestCapture(macsec_ifname0, cap_macsec0)
cmd[3] = WlantestCapture(macsec_ifname1, cap_macsec1)
time.sleep(0.5)
logger.info("wpas0 MIB:\n" + wpas0.request("MIB"))
@ -843,7 +804,7 @@ def run_macsec_hostapd_psk(dev, apdev, params, prefix, integ_only=False,
time.sleep(1)
for i in range(len(cmd)):
cmd[i].terminate()
cmd[i].close()
def test_macsec_hostapd_eap(dev, apdev, params):
"""MACsec EAP with hostapd"""
@ -865,14 +826,8 @@ def run_macsec_hostapd_eap(dev, apdev, params, prefix, integ_only=False,
subprocess.check_call(["ip", "link", "set", "dev", "veth%d" % i, "up"])
cmd = {}
cmd[0] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'veth0',
'-w', cap_veth0, '-s', '2000',
'--immediate-mode'],
stderr=open('/dev/null', 'w'))
cmd[1] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'veth1',
'-w', cap_veth1, '-s', '2000',
'--immediate-mode'],
stderr=open('/dev/null', 'w'))
cmd[0] = WlantestCapture('veth0', cap_veth0)
cmd[1] = WlantestCapture('veth1', cap_veth1)
wpa = add_wpas_interfaces(count=1)
wpas0 = wpa[0]
@ -911,20 +866,14 @@ def run_macsec_hostapd_eap(dev, apdev, params, prefix, integ_only=False,
if expect_failure:
for i in range(len(cmd)):
cmd[i].terminate()
cmd[i].close()
return
macsec_ifname0 = wpas0.get_driver_status_field("parent_ifname")
macsec_ifname1 = hapd.get_driver_status_field("parent_ifname")
cmd[2] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', macsec_ifname0,
'-w', cap_macsec0, '-s', '2000',
'--immediate-mode'],
stderr=open('/dev/null', 'w'))
cmd[3] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', macsec_ifname1,
'-w', cap_macsec1, '-s', '2000',
'--immediate-mode'],
stderr=open('/dev/null', 'w'))
cmd[2] = WlantestCapture(macsec_ifname0, cap_macsec0)
cmd[3] = WlantestCapture(macsec_ifname1, cap_macsec1)
time.sleep(0.5)
logger.info("wpas0 MIB:\n" + wpas0.request("MIB"))
@ -938,4 +887,4 @@ def run_macsec_hostapd_eap(dev, apdev, params, prefix, integ_only=False,
time.sleep(1)
for i in range(len(cmd)):
cmd[i].terminate()
cmd[i].close()

View file

@ -1,5 +1,5 @@
# Python class for controlling wlantest
# Copyright (c) 2013-2014, Jouni Malinen <j@w1.fi>
# Copyright (c) 2013-2019, Jouni Malinen <j@w1.fi>
#
# This software may be distributed under the terms of the BSD license.
# See README for more details.
@ -242,3 +242,35 @@ class Wlantest:
tx[tid] = self.get_tx_tid(bssid, addr, tid)
rx[tid] = self.get_rx_tid(bssid, addr, tid)
return [tx, rx]
class WlantestCapture:
def __init__(self, ifname, output, netns=None):
self.cmd = None
self.ifname = ifname
if os.path.isfile('../../wlantest/wlantest'):
bin = '../../wlantest/wlantest'
else:
bin = 'wlantest'
logger.debug("wlantest[%s] starting" % ifname)
args = [bin, '-e', '-i', ifname, '-w', output]
if netns:
args = ['ip', 'netns', 'exec', netns] + args
self.cmd = subprocess.Popen(args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
def __del__(self):
if self.cmd:
self.close()
def close(self):
logger.debug("wlantest[%s] stopping" % self.ifname)
self.cmd.terminate()
res = self.cmd.communicate()
if len(res[0]) > 0:
logger.debug("wlantest[%s] stdout: %s" % (self.ifname,
res[0].decode().strip()))
if len(res[1]) > 0:
logger.debug("wlantest[%s] stderr: %s" % (self.ifname,
res[1].decode().strip()))
self.cmd = None