tests: DPP Relay and incomplete connections

Signed-off-by: Jouni Malinen <jouni@codeaurora.org>
This commit is contained in:
Jouni Malinen 2021-04-13 00:31:36 +03:00 committed by Jouni Malinen
parent c2d7b027b1
commit d961326f19

View file

@ -15,6 +15,10 @@ import socket
import struct import struct
import subprocess import subprocess
import time import time
try:
from socketserver import StreamRequestHandler, TCPServer
except ImportError:
from SocketServer import StreamRequestHandler, TCPServer
import hostapd import hostapd
import hwsim_utils import hwsim_utils
@ -5284,6 +5288,61 @@ def run_dpp_controller_relay(dev, apdev, params, chirp=False):
time.sleep(0.5) time.sleep(0.5)
wt.close() wt.close()
class MyTCPServer(TCPServer):
def __init__(self, addr, handler):
self.allow_reuse_address = True
TCPServer.__init__(self, addr, handler)
class DPPControllerServer(StreamRequestHandler):
def handle(self):
data = self.rfile.read()
# Do not reply
def test_dpp_relay_incomplete_connections(dev, apdev):
"""DPP Relay and incomplete connections"""
check_dpp_capab(dev[0], min_ver=2)
check_dpp_capab(dev[1], min_ver=2)
id_c = dev[1].dpp_bootstrap_gen()
uri_c = dev[1].request("DPP_BOOTSTRAP_GET_URI %d" % id_c)
res = dev[1].request("DPP_BOOTSTRAP_INFO %d" % id_c)
pkhash = None
for line in res.splitlines():
name, value = line.split('=')
if name == "pkhash":
pkhash = value
break
if not pkhash:
raise Exception("Could not fetch public key hash from Controller")
params = {"ssid": "unconfigured",
"channel": "6",
"dpp_controller": "ipaddr=127.0.0.1 pkhash=" + pkhash}
hapd = hostapd.add_ap(apdev[0], params)
check_dpp_capab(hapd)
server = MyTCPServer(("127.0.0.1", 8908), DPPControllerServer)
server.timeout = 30
hapd.set("ext_mgmt_frame_handling", "1")
dev[0].dpp_auth_init(uri=uri_c, role="enrollee")
msg = hapd.mgmt_rx()
if msg is None:
raise Exception("MGMT RX wait timed out")
dev[0].request("DPP_STOP_LISTEN")
frame = msg['frame']
for i in range(20):
if i == 14:
time.sleep(20)
addr = struct.pack('6B', 0x02, 0, 0, 0, 0, i)
tmp = frame[0:10] + addr + frame[16:]
hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=" + binascii.hexlify(tmp).decode())
ev = hapd.wait_event(["DPP-FAIL"], timeout=0.1)
if ev:
raise Exception("DPP relay failed [%d]: %s" % (i + 1, ev))
server.server_close()
def test_dpp_tcp(dev, apdev, params): def test_dpp_tcp(dev, apdev, params):
"""DPP over TCP""" """DPP over TCP"""
prefix = "dpp_tcp" prefix = "dpp_tcp"