tests: More WPS ER HTTP protocol testing

Signed-off-by: Jouni Malinen <j@w1.fi>
This commit is contained in:
Jouni Malinen 2015-08-19 01:36:48 +03:00
parent 0dae8c9974
commit 4c3ae1c0d1

View file

@ -3002,21 +3002,8 @@ def _test_ap_wps_er_ssdp_proto(dev, apdev):
wps_event_url = None
class WPSAPHTTPServer(SocketServer.StreamRequestHandler):
def handle(self):
data = self.rfile.readline().strip()
logger.info("HTTP server received: " + data)
while True:
hdr = self.rfile.readline().strip()
if len(hdr) == 0:
break
logger.info("HTTP header: " + hdr)
if "CALLBACK:" in hdr:
global wps_event_url
wps_event_url = hdr.split(' ')[1].strip('<>')
if "GET /foo.xml" in data:
payload = '''<?xml version="1.0"?>
def gen_upnp_info(eventSubURL='wps_event'):
payload = '''<?xml version="1.0"?>
<root xmlns="urn:schemas-upnp-org:device-1-0">
<specVersion>
<major>1</major>
@ -3036,22 +3023,24 @@ class WPSAPHTTPServer(SocketServer.StreamRequestHandler):
<serviceId>urn:wifialliance-org:serviceId:WFAWLANConfig1</serviceId>
<SCPDURL>wps_scpd.xml</SCPDURL>
<controlURL>wps_control</controlURL>
<eventSubURL>wps_event</eventSubURL>
</service>
'''
if eventSubURL:
payload += '<eventSubURL>' + eventSubURL + '</eventSubURL>'
payload += '''</service>
</serviceList>
</device>
</root>
'''
hdr = 'HTTP/1.1 200 OK\r\n' + \
'Content-Type: text/xml; charset="utf-8"\r\n' + \
'Server: Unspecified, UPnP/1.0, Unspecified\r\n' + \
'Connection: close\r\n' + \
'Content-Length: ' + str(len(payload)) + '\r\n' + \
'Date: Sat, 15 Aug 2015 18:55:08 GMT\r\n\r\n'
self.wfile.write(hdr + payload)
hdr = 'HTTP/1.1 200 OK\r\n' + \
'Content-Type: text/xml; charset="utf-8"\r\n' + \
'Server: Unspecified, UPnP/1.0, Unspecified\r\n' + \
'Connection: close\r\n' + \
'Content-Length: ' + str(len(payload)) + '\r\n' + \
'Date: Sat, 15 Aug 2015 18:55:08 GMT\r\n\r\n'
return hdr + payload
if "POST /wps_control" in data:
payload = '''<?xml version="1.0"?>
def gen_wps_control():
payload = '''<?xml version="1.0"?>
<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<s:Body>
<u:GetDeviceInfoResponse xmlns:u="urn:schemas-wifialliance-org:service:WFAWLANConfig:1">
@ -3068,25 +3057,89 @@ AAYANyoAASA=
</s:Body>
</s:Envelope>
'''
hdr = 'HTTP/1.1 200 OK\r\n' + \
'Content-Type: text/xml; charset="utf-8"\r\n' + \
'Server: Unspecified, UPnP/1.0, Unspecified\r\n' + \
'Connection: close\r\n' + \
'Content-Length: ' + str(len(payload)) + '\r\n' + \
'Date: Sat, 15 Aug 2015 18:55:08 GMT\r\n\r\n'
self.wfile.write(hdr + payload)
hdr = 'HTTP/1.1 200 OK\r\n' + \
'Content-Type: text/xml; charset="utf-8"\r\n' + \
'Server: Unspecified, UPnP/1.0, Unspecified\r\n' + \
'Connection: close\r\n' + \
'Content-Length: ' + str(len(payload)) + '\r\n' + \
'Date: Sat, 15 Aug 2015 18:55:08 GMT\r\n\r\n'
return hdr + payload
def gen_wps_event():
payload = ""
hdr = 'HTTP/1.1 200 OK\r\n' + \
'Content-Type: text/xml; charset="utf-8"\r\n' + \
'Server: Unspecified, UPnP/1.0, Unspecified\r\n' + \
'Connection: close\r\n' + \
'Content-Length: ' + str(len(payload)) + '\r\n' + \
'SID: uuid:7eb3342a-8a5f-47fe-a585-0785bfec6d8a\r\n' + \
'Timeout: Second-1801\r\n' + \
'Date: Sat, 15 Aug 2015 18:55:08 GMT\r\n\r\n'
return hdr + payload
class WPSAPHTTPServer(SocketServer.StreamRequestHandler):
def handle(self):
data = self.rfile.readline().strip()
logger.info("HTTP server received: " + data)
while True:
hdr = self.rfile.readline().strip()
if len(hdr) == 0:
break
logger.info("HTTP header: " + hdr)
if "CALLBACK:" in hdr:
global wps_event_url
wps_event_url = hdr.split(' ')[1].strip('<>')
if "GET /foo.xml" in data:
self.wfile.write(gen_upnp_info())
if "POST /wps_control" in data:
self.wfile.write(gen_wps_control())
if "SUBSCRIBE /wps_event" in data:
payload = ""
hdr = 'HTTP/1.1 200 OK\r\n' + \
'Content-Type: text/xml; charset="utf-8"\r\n' + \
'Server: Unspecified, UPnP/1.0, Unspecified\r\n' + \
'Connection: close\r\n' + \
'Content-Length: ' + str(len(payload)) + '\r\n' + \
'SID: uuid:7eb3342a-8a5f-47fe-a585-0785bfec6d8a\r\n' + \
'Timeout: Second-1801\r\n' + \
'Date: Sat, 15 Aug 2015 18:55:08 GMT\r\n\r\n'
self.wfile.write(hdr + payload)
self.wfile.write(gen_wps_event())
class MyTCPServer(SocketServer.TCPServer):
def __init__(self, addr, handler):
self.allow_reuse_address = True
SocketServer.TCPServer.__init__(self, addr, handler)
def wps_er_start(dev, http_server):
socket.setdefaulttimeout(1)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(("239.255.255.250", 1900))
dev.request("WPS_ER_START ifname=lo")
(msg,addr) = sock.recvfrom(1000)
logger.debug("Received SSDP message from %s: %s" % (str(addr), msg))
if "M-SEARCH" not in msg:
raise Exception("Not an M-SEARCH")
# Add an AP with a valid URL and server listing to it
server = MyTCPServer(("127.0.0.1", 12345), http_server)
sock.sendto("HTTP/1.1 200 OK\r\nST: urn:schemas-wifialliance-org:device:WFADevice:1\r\nlocation:http://127.0.0.1:12345/foo.xml\r\ncache-control:max-age=1\r\n\r\n", addr)
server.timeout = 1
return server,sock
def wps_er_stop(dev, sock, server, on_alloc_fail=False):
sock.close()
server.server_close()
if on_alloc_fail:
done = False
for i in range(50):
res = dev.request("GET_ALLOC_FAIL")
if res.startswith("0:"):
done = True
break
time.sleep(0.1)
if not done:
raise Exception("No allocation failure reported")
else:
ev = dev.wait_event(["WPS-ER-AP-REMOVE"], timeout=5)
if ev is None:
raise Exception("No WPS-ER-AP-REMOVE event on max-age timeout")
dev.request("WPS_ER_STOP")
def test_ap_wps_er_http_proto(dev, apdev):
"""WPS ER HTTP protocol testing"""
@ -3097,20 +3150,7 @@ def test_ap_wps_er_http_proto(dev, apdev):
def _test_ap_wps_er_http_proto(dev, apdev):
uuid = '27ea801a-9e5c-4e73-bd82-f89cbcd10d7e'
socket.setdefaulttimeout(1)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(("239.255.255.250", 1900))
dev[0].request("WPS_ER_START ifname=lo")
(msg,addr) = sock.recvfrom(1000)
logger.debug("Received SSDP message from %s: %s" % (str(addr), msg))
if "M-SEARCH" not in msg:
raise Exception("Not an M-SEARCH")
# Add an AP with a valid URL and server listing to it
server = SocketServer.TCPServer(("127.0.0.1", 12345), WPSAPHTTPServer)
sock.sendto("HTTP/1.1 200 OK\r\nST: urn:schemas-wifialliance-org:device:WFADevice:1\r\nlocation:http://127.0.0.1:12345/foo.xml\r\ncache-control:max-age=1\r\n\r\n", addr)
server.timeout = 1
server,sock = wps_er_start(dev[0], WPSAPHTTPServer)
global wps_event_url
wps_event_url = None
server.handle_request()
@ -3197,3 +3237,81 @@ RGV2aWNlIEEQSQAGADcqAAEg
conn = httplib.HTTPConnection(url.netloc)
conn.request("FOOBAR", '/foobar/123456789/123', payload, headers)
time.sleep(0.5)
class WPSAPHTTPServer2(SocketServer.StreamRequestHandler):
def handle(self):
data = self.rfile.readline().strip()
logger.info("HTTP server received: " + data)
while True:
hdr = self.rfile.readline().strip()
if len(hdr) == 0:
break
logger.info("HTTP header: " + hdr)
if "GET /foo.xml" in data:
self.wfile.write(gen_upnp_info(eventSubURL=None))
if "POST /wps_control" in data:
self.wfile.write(gen_wps_control())
def test_ap_wps_er_http_proto_no_event_sub_url(dev, apdev):
"""WPS ER HTTP protocol testing - no eventSubURL"""
try:
_test_ap_wps_er_http_proto_no_event_sub_url(dev, apdev)
finally:
dev[0].request("WPS_ER_STOP")
def _test_ap_wps_er_http_proto_no_event_sub_url(dev, apdev):
server,sock = wps_er_start(dev[0], WPSAPHTTPServer2)
server.handle_request()
server.handle_request()
wps_er_stop(dev[0], sock, server)
class WPSAPHTTPServer3(SocketServer.StreamRequestHandler):
def handle(self):
data = self.rfile.readline().strip()
logger.info("HTTP server received: " + data)
while True:
hdr = self.rfile.readline().strip()
if len(hdr) == 0:
break
logger.info("HTTP header: " + hdr)
if "GET /foo.xml" in data:
self.wfile.write(gen_upnp_info(eventSubURL='http://example.com/wps_event'))
if "POST /wps_control" in data:
self.wfile.write(gen_wps_control())
def test_ap_wps_er_http_proto_event_sub_url_dns(dev, apdev):
"""WPS ER HTTP protocol testing - DNS name in eventSubURL"""
try:
_test_ap_wps_er_http_proto_event_sub_url_dns(dev, apdev)
finally:
dev[0].request("WPS_ER_STOP")
def _test_ap_wps_er_http_proto_event_sub_url_dns(dev, apdev):
server,sock = wps_er_start(dev[0], WPSAPHTTPServer3)
server.handle_request()
server.handle_request()
wps_er_stop(dev[0], sock, server)
def test_ap_wps_er_http_proto_subscribe_oom(dev, apdev):
"""WPS ER HTTP protocol testing - subscribe OOM"""
try:
_test_ap_wps_er_http_proto_subscribe_oom(dev, apdev)
finally:
dev[0].request("WPS_ER_STOP")
def _test_ap_wps_er_http_proto_subscribe_oom(dev, apdev):
tests = [ (1, "http_client_url_parse"),
(1, "wpabuf_alloc;wps_er_subscribe"),
(1, "http_client_addr"),
(1, "eloop_register_sock;http_client_addr"),
(1, "eloop_register_timeout;http_client_addr") ]
for count,func in tests:
with alloc_fail(dev[0], count, func):
server,sock = wps_er_start(dev[0], WPSAPHTTPServer)
server.handle_request()
server.handle_request()
wps_er_stop(dev[0], sock, server, on_alloc_fail=True)