# P2P protocol tests for various messages # Copyright (c) 2014-2015, Jouni Malinen # # This software may be distributed under the terms of the BSD license. # See README for more details. from remotehost import remote_compatible import binascii import struct import time import logging logger = logging.getLogger() import hostapd from p2p_utils import * from test_gas import anqp_adv_proto def ie_ssid(ssid): return struct.pack("HHH", WSC_ATTR_CONFIG_METHODS, 2, methods) def p2p_attr_status(status=P2P_SC_SUCCESS): return struct.pack("H", config_methods) + struct.pack("8BB", *t2) + struct.pack('>HH', 0x1011, len(name)) + name.encode() def p2p_attr_group_id(addr, ssid): val = struct.unpack('6B', binascii.unhexlify(addr.replace(':', ''))) t = (P2P_ATTR_GROUP_ID, 6 + len(ssid)) + val return struct.pack(' 2: (id, elen) = struct.unpack('BB', pos[0:2]) pos = pos[2:] if elen > len(pos): raise Exception("Truncated IE in P2P Public Action frame (elen=%d left=%d)" % (elen, len(pos))) if id == WLAN_EID_VENDOR_SPECIFIC: if elen < 4: raise Exception("Too short vendor specific IE in P2P Public Action frame (elen=%d)" % elen) (oui1, oui2, oui3, subtype) = struct.unpack('BBBB', pos[0:4]) if oui1 == 0x50 and oui2 == 0x6f and oui3 == 0x9a and subtype == 9: if 'p2p' in p2p: p2p['p2p'] += pos[4:elen] else: p2p['p2p'] = pos[4:elen] if oui1 == 0x00 and oui2 == 0x50 and oui3 == 0xf2 and subtype == 4: p2p['wsc'] = pos[4:elen] pos = pos[elen:] if len(pos) > 0: raise Exception("Invalid element in P2P Public Action frame") if 'p2p' in p2p: p2p['p2p_attrs'] = {} pos = p2p['p2p'] while len(pos) >= 3: (id, alen) = struct.unpack(' len(pos): logger.info("P2P payload: " + binascii.hexlify(p2p['p2p'])) raise Exception("Truncated P2P attribute in P2P Public Action frame (alen=%d left=%d p2p-payload=%d)" % (alen, len(pos), len(p2p['p2p']))) p2p['p2p_attrs'][id] = pos[0:alen] pos = pos[alen:] if P2P_ATTR_STATUS in p2p['p2p_attrs']: p2p['p2p_status'] = struct.unpack('B', p2p['p2p_attrs'][P2P_ATTR_STATUS])[0] if 'wsc' in p2p: p2p['wsc_attrs'] = {} pos = p2p['wsc'] while len(pos) >= 4: (id, alen) = struct.unpack('>HH', pos[0:4]) pos = pos[4:] if alen > len(pos): logger.info("WSC payload: " + binascii.hexlify(p2p['wsc'])) raise Exception("Truncated WSC attribute in P2P Public Action frame (alen=%d left=%d wsc-payload=%d)" % (alen, len(pos), len(p2p['wsc']))) p2p['wsc_attrs'][id] = pos[0:alen] pos = pos[alen:] return p2p @remote_compatible def test_p2p_msg_empty(dev, apdev): """P2P protocol test: empty P2P Public Action frame""" dst, src, hapd, channel = start_p2p(dev, apdev) msg = p2p_hdr(dst, src) hapd.mgmt_tx(msg) @remote_compatible def test_p2p_msg_long_ssid(dev, apdev): """P2P protocol test: Too long SSID in P2P Public Action frame""" dst, src, hapd, channel = start_p2p(dev, apdev) msg = p2p_hdr(dst, src, type=P2P_INVITATION_REQ, dialog_token=1) attrs = p2p_attr_config_timeout() attrs += p2p_attr_invitation_flags() attrs += p2p_attr_operating_channel() attrs += p2p_attr_group_bssid(src) attrs += p2p_attr_channel_list() attrs += p2p_attr_group_id(src, 'DIRECT-foo') attrs += p2p_attr_device_info(src, config_methods=0x0108) msg['payload'] += ie_p2p(attrs) msg['payload'] += ie_ssid(255 * 'A') hapd.mgmt_tx(msg) ev = dev[0].wait_global_event(["P2P-DEVICE-FOUND"], timeout=5) if ev is None: raise Exception("Timeout on device found event") @remote_compatible def test_p2p_msg_long_dev_name(dev, apdev): """P2P protocol test: Too long Device Name in P2P Public Action frame""" dst, src, hapd, channel = start_p2p(dev, apdev) msg = p2p_hdr(dst, src, type=P2P_INVITATION_REQ, dialog_token=1) attrs = p2p_attr_config_timeout() attrs += p2p_attr_invitation_flags() attrs += p2p_attr_operating_channel() attrs += p2p_attr_group_bssid(src) attrs += p2p_attr_channel_list() attrs += p2p_attr_group_id(src, 'DIRECT-foo') attrs += p2p_attr_device_info(src, config_methods=0x0108, name="123456789012345678901234567890123") msg['payload'] += ie_p2p(attrs) hapd.mgmt_tx(msg) ev = dev[0].wait_event(["P2P-DEVICE-FOUND"], timeout=0.1) if ev is not None: raise Exception("Unexpected device found event") def test_p2p_msg_invitation_req(dev, apdev): """P2P protocol tests for invitation request processing""" dst, src, hapd, channel = start_p2p(dev, apdev) # Empty P2P Invitation Request (missing dialog token) msg = p2p_hdr(dst, src, type=P2P_INVITATION_REQ, dialog_token=None) hapd.mgmt_tx(msg) dialog_token = 0 # Various p2p_parse() failure cases due to invalid attributes # Too short attribute header dialog_token += 1 msg = p2p_hdr(dst, src, type=P2P_INVITATION_REQ, dialog_token=dialog_token) attrs = struct.pack("L', 0x506f9a00) msg['payload'] += struct.pack('L', 0x506f9a09) msg['payload'] += struct.pack('L', 0x506f9a09) req += struct.pack('L', 0x506f9a09) req += struct.pack('