dpp-nfc: Add test mode for negotiated connection handover
Allow all actual DPP processing steps in wpa_supplicant to be skipped by specifying hardcoded URI values. Also allow a hardcoded crn to be specified to force specific handover requestor/selector roles. Signed-off-by: Jouni Malinen <jouni@codeaurora.org>
This commit is contained in:
parent
730fc307b1
commit
6eaee933d7
1 changed files with 88 additions and 41 deletions
|
@ -248,7 +248,12 @@ def dpp_handover_client(llc, alt=False):
|
||||||
if alt:
|
if alt:
|
||||||
global altchanlist
|
global altchanlist
|
||||||
chan_override = altchanlist
|
chan_override = altchanlist
|
||||||
uri = wpas_get_nfc_uri(start_listen=False, chan_override=chan_override)
|
global test_uri, test_alt_uri
|
||||||
|
if test_uri:
|
||||||
|
summary("TEST MODE: Using specified URI (alt=%s)" % str(alt))
|
||||||
|
uri = test_alt_uri if alt else test_uri
|
||||||
|
else:
|
||||||
|
uri = wpas_get_nfc_uri(start_listen=False, chan_override=chan_override)
|
||||||
if uri is None:
|
if uri is None:
|
||||||
summary("Cannot start handover client - no bootstrap URI available",
|
summary("Cannot start handover client - no bootstrap URI available",
|
||||||
color=C_RED)
|
color=C_RED)
|
||||||
|
@ -256,7 +261,14 @@ def dpp_handover_client(llc, alt=False):
|
||||||
uri = ndef.UriRecord(uri)
|
uri = ndef.UriRecord(uri)
|
||||||
summary("NFC URI record for DPP: " + str(uri))
|
summary("NFC URI record for DPP: " + str(uri))
|
||||||
carrier = ndef.Record('application/vnd.wfa.dpp', 'A', uri.data)
|
carrier = ndef.Record('application/vnd.wfa.dpp', 'A', uri.data)
|
||||||
crn = os.urandom(2)
|
global test_crn
|
||||||
|
if test_crn:
|
||||||
|
prev, = struct.unpack('>H', test_crn)
|
||||||
|
summary("TEST MODE: Use specified crn %d" % prev)
|
||||||
|
crn = test_crn
|
||||||
|
test_crn = struct.pack('>H', prev + 0x10)
|
||||||
|
else:
|
||||||
|
crn = os.urandom(2)
|
||||||
hr = ndef.HandoverRequestRecord(version="1.4", crn=crn)
|
hr = ndef.HandoverRequestRecord(version="1.4", crn=crn)
|
||||||
hr.add_alternative_carrier('active', carrier.name)
|
hr.add_alternative_carrier('active', carrier.name)
|
||||||
message = [hr, carrier]
|
message = [hr, carrier]
|
||||||
|
@ -338,6 +350,9 @@ def dpp_handover_client(llc, alt=False):
|
||||||
dpp_found = True
|
dpp_found = True
|
||||||
uri = carrier.data[1:].decode("utf-8")
|
uri = carrier.data[1:].decode("utf-8")
|
||||||
summary("DPP URI: " + uri)
|
summary("DPP URI: " + uri)
|
||||||
|
if test_uri:
|
||||||
|
summary("TEST MODE: Fake processing")
|
||||||
|
break
|
||||||
res = wpas_report_handover_sel(uri)
|
res = wpas_report_handover_sel(uri)
|
||||||
if res is None or "FAIL" in res:
|
if res is None or "FAIL" in res:
|
||||||
summary("DPP handover report rejected", color=C_RED)
|
summary("DPP handover report rejected", color=C_RED)
|
||||||
|
@ -467,10 +482,20 @@ class HandoverServer(nfc.handover.HandoverServer):
|
||||||
uri = carrier.data[1:].decode("utf-8")
|
uri = carrier.data[1:].decode("utf-8")
|
||||||
summary("Received DPP URI: " + uri)
|
summary("Received DPP URI: " + uri)
|
||||||
|
|
||||||
data = wpas_get_nfc_uri(start_listen=False, pick_channel=True)
|
global test_uri, test_alt_uri
|
||||||
|
if test_uri:
|
||||||
|
summary("TEST MODE: Using specified URI")
|
||||||
|
data = test_sel_uri if test_sel_uri else test_uri
|
||||||
|
else:
|
||||||
|
data = wpas_get_nfc_uri(start_listen=False,
|
||||||
|
pick_channel=True)
|
||||||
summary("Own URI (pre-processing): %s" % data)
|
summary("Own URI (pre-processing): %s" % data)
|
||||||
|
|
||||||
res = wpas_report_handover_req(uri)
|
if test_uri:
|
||||||
|
summary("TEST MODE: Fake processing")
|
||||||
|
res = "OK"
|
||||||
|
else:
|
||||||
|
res = wpas_report_handover_req(uri)
|
||||||
if res is None or "FAIL" in res:
|
if res is None or "FAIL" in res:
|
||||||
summary("DPP handover request processing failed",
|
summary("DPP handover request processing failed",
|
||||||
color=C_RED)
|
color=C_RED)
|
||||||
|
@ -481,48 +506,54 @@ class HandoverServer(nfc.handover.HandoverServer):
|
||||||
summary("Own URI (try another channel list): %s" % data)
|
summary("Own URI (try another channel list): %s" % data)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
if test_alt_uri:
|
||||||
|
summary("TEST MODE: Reject initial proposal")
|
||||||
|
continue
|
||||||
|
|
||||||
found = True
|
found = True
|
||||||
|
|
||||||
wpas = wpas_connect()
|
if not test_uri:
|
||||||
if wpas is None:
|
wpas = wpas_connect()
|
||||||
continue
|
if wpas is None:
|
||||||
global own_id
|
continue
|
||||||
data = wpas.request("DPP_BOOTSTRAP_GET_URI %d" % own_id).rstrip()
|
global own_id
|
||||||
if "FAIL" in data:
|
data = wpas.request("DPP_BOOTSTRAP_GET_URI %d" % own_id).rstrip()
|
||||||
continue
|
if "FAIL" in data:
|
||||||
|
continue
|
||||||
summary("Own URI (post-processing): %s" % data)
|
summary("Own URI (post-processing): %s" % data)
|
||||||
uri = ndef.UriRecord(data)
|
uri = ndef.UriRecord(data)
|
||||||
summary("Own bootstrapping NFC URI record: " + str(uri))
|
summary("Own bootstrapping NFC URI record: " + str(uri))
|
||||||
|
|
||||||
info = wpas.request("DPP_BOOTSTRAP_INFO %d" % own_id)
|
if not test_uri:
|
||||||
freq = None
|
info = wpas.request("DPP_BOOTSTRAP_INFO %d" % own_id)
|
||||||
for line in info.splitlines():
|
freq = None
|
||||||
if line.startswith("use_freq="):
|
for line in info.splitlines():
|
||||||
freq = int(line.split('=')[1])
|
if line.startswith("use_freq="):
|
||||||
if freq is None or freq == 0:
|
freq = int(line.split('=')[1])
|
||||||
summary("No channel negotiated over NFC - use channel 6")
|
if freq is None or freq == 0:
|
||||||
freq = 2437
|
summary("No channel negotiated over NFC - use channel 6")
|
||||||
else:
|
freq = 2437
|
||||||
summary("Negotiated channel: %d MHz" % freq)
|
else:
|
||||||
if get_status_field(wpas, "bssid[0]"):
|
summary("Negotiated channel: %d MHz" % freq)
|
||||||
summary("Own AP freq: %s MHz" % str(get_status_field(wpas, "freq")))
|
if get_status_field(wpas, "bssid[0]"):
|
||||||
if get_status_field(wpas, "beacon_set", extra="DRIVER") is None:
|
summary("Own AP freq: %s MHz" % str(get_status_field(wpas, "freq")))
|
||||||
summary("Enable beaconing to have radio ready for RX")
|
if get_status_field(wpas, "beacon_set", extra="DRIVER") is None:
|
||||||
wpas.request("DISABLE")
|
summary("Enable beaconing to have radio ready for RX")
|
||||||
wpas.request("SET start_disabled 0")
|
wpas.request("DISABLE")
|
||||||
wpas.request("ENABLE")
|
wpas.request("SET start_disabled 0")
|
||||||
cmd = "DPP_LISTEN %d" % freq
|
wpas.request("ENABLE")
|
||||||
global enrollee_only
|
cmd = "DPP_LISTEN %d" % freq
|
||||||
global configurator_only
|
global enrollee_only
|
||||||
if enrollee_only:
|
global configurator_only
|
||||||
cmd += " role=enrollee"
|
if enrollee_only:
|
||||||
elif configurator_only:
|
cmd += " role=enrollee"
|
||||||
cmd += " role=configurator"
|
elif configurator_only:
|
||||||
summary(cmd)
|
cmd += " role=configurator"
|
||||||
res = wpas.request(cmd)
|
summary(cmd)
|
||||||
if "OK" not in res:
|
res = wpas.request(cmd)
|
||||||
summary("Failed to start DPP listen", color=C_RED)
|
if "OK" not in res:
|
||||||
break
|
summary("Failed to start DPP listen", color=C_RED)
|
||||||
|
break
|
||||||
|
|
||||||
carrier = ndef.Record('application/vnd.wfa.dpp', 'A', uri.data)
|
carrier = ndef.Record('application/vnd.wfa.dpp', 'A', uri.data)
|
||||||
summary("Own DPP carrier record: " + str(carrier))
|
summary("Own DPP carrier record: " + str(carrier))
|
||||||
|
@ -802,6 +833,14 @@ def main():
|
||||||
parser.add_argument('--chan', default=None, help='channel list')
|
parser.add_argument('--chan', default=None, help='channel list')
|
||||||
parser.add_argument('--altchan', default=None, help='alternative channel list')
|
parser.add_argument('--altchan', default=None, help='alternative channel list')
|
||||||
parser.add_argument('--netrole', default=None, help='netrole for Enrollee')
|
parser.add_argument('--netrole', default=None, help='netrole for Enrollee')
|
||||||
|
parser.add_argument('--test-uri', default=None,
|
||||||
|
help='test mode: initial URI')
|
||||||
|
parser.add_argument('--test-alt-uri', default=None,
|
||||||
|
help='test mode: alternative URI')
|
||||||
|
parser.add_argument('--test-sel-uri', default=None,
|
||||||
|
help='test mode: handover select URI')
|
||||||
|
parser.add_argument('--test-crn', default=None,
|
||||||
|
help='test mode: hardcoded crn')
|
||||||
parser.add_argument('command', choices=['write-nfc-uri',
|
parser.add_argument('command', choices=['write-nfc-uri',
|
||||||
'write-nfc-hs'],
|
'write-nfc-hs'],
|
||||||
nargs='?')
|
nargs='?')
|
||||||
|
@ -814,10 +853,18 @@ def main():
|
||||||
global no_wait
|
global no_wait
|
||||||
no_wait = args.no_wait
|
no_wait = args.no_wait
|
||||||
|
|
||||||
global chanlist, altchanlist, netrole
|
global chanlist, altchanlist, netrole, test_uri, test_alt_uri, test_sel_uri
|
||||||
|
global test_crn
|
||||||
chanlist = args.chan
|
chanlist = args.chan
|
||||||
altchanlist = args.altchan
|
altchanlist = args.altchan
|
||||||
netrole = args.netrole
|
netrole = args.netrole
|
||||||
|
test_uri = args.test_uri
|
||||||
|
test_alt_uri = args.test_alt_uri
|
||||||
|
test_sel_uri = args.test_sel_uri
|
||||||
|
if args.test_crn:
|
||||||
|
test_crn = struct.pack('>H', int(args.test_crn))
|
||||||
|
else:
|
||||||
|
test_crn = None
|
||||||
|
|
||||||
logging.basicConfig(level=args.loglevel)
|
logging.basicConfig(level=args.loglevel)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue