 2ec82e67c1
			
		
	
	
		2ec82e67c1
		
	
	
	
	
		
			
			This adds new files with test cases to verify both the old and new wpa_supplicant D-Bus interface. Signed-off-by: Jouni Malinen <j@w1.fi>
		
			
				
	
	
		
			704 lines
		
	
	
	
		
			29 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			704 lines
		
	
	
	
		
			29 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # wpa_supplicant D-Bus old interface tests
 | |
| # Copyright (c) 2014, Jouni Malinen <j@w1.fi>
 | |
| #
 | |
| # This software may be distributed under the terms of the BSD license.
 | |
| # See README for more details.
 | |
| 
 | |
| import gobject
 | |
| import logging
 | |
| logger = logging.getLogger()
 | |
| 
 | |
| import hostapd
 | |
| from test_dbus import TestDbus, start_ap
 | |
| 
 | |
| WPAS_DBUS_OLD_SERVICE = "fi.epitest.hostap.WPASupplicant"
 | |
| WPAS_DBUS_OLD_PATH = "/fi/epitest/hostap/WPASupplicant"
 | |
| WPAS_DBUS_OLD_IFACE = "fi.epitest.hostap.WPASupplicant.Interface"
 | |
| WPAS_DBUS_OLD_BSSID = "fi.epitest.hostap.WPASupplicant.BSSID"
 | |
| WPAS_DBUS_OLD_NETWORK = "fi.epitest.hostap.WPASupplicant.Network"
 | |
| 
 | |
| def prepare_dbus(dev):
 | |
|     try:
 | |
|         import dbus
 | |
|         from dbus.mainloop.glib import DBusGMainLoop
 | |
|         dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
 | |
|         bus = dbus.SystemBus()
 | |
|         wpas_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, WPAS_DBUS_OLD_PATH)
 | |
|         wpas = dbus.Interface(wpas_obj, WPAS_DBUS_OLD_SERVICE)
 | |
|         path = wpas.getInterface(dev.ifname)
 | |
|         if_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, path)
 | |
|         return (dbus,bus,wpas_obj,path,if_obj)
 | |
|     except Exception, e:
 | |
|         logger.info("No D-Bus support available: " + str(e))
 | |
|         raise Exception("hwsim-SKIP")
 | |
| 
 | |
| class TestDbusOldWps(TestDbus):
 | |
|     def __init__(self, bus):
 | |
|         TestDbus.__init__(self, bus)
 | |
|         self.event_ok = False
 | |
| 
 | |
|     def __enter__(self):
 | |
|         gobject.timeout_add(1, self.run_wps)
 | |
|         gobject.timeout_add(15000, self.timeout)
 | |
|         self.add_signal(self.wpsCred, WPAS_DBUS_OLD_IFACE, "WpsCred")
 | |
|         self.loop.run()
 | |
|         return self
 | |
| 
 | |
|     def wpsCred(self, cred):
 | |
|         logger.debug("wpsCred: " + str(cred))
 | |
|         self.event_ok = True
 | |
|         self.loop.quit()
 | |
| 
 | |
|     def success(self):
 | |
|         return self.event_ok
 | |
| 
 | |
| def test_dbus_old(dev, apdev):
 | |
|     """The old D-Bus interface"""
 | |
|     (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
 | |
| 
 | |
|     res = if_obj.capabilities(dbus_interface=WPAS_DBUS_OLD_IFACE)
 | |
|     logger.debug("capabilities(): " + str(res))
 | |
|     if 'auth_alg' not in res or "OPEN" not in res['auth_alg']:
 | |
|         raise Exception("Unexpected capabilities")
 | |
|     res2 = if_obj.capabilities(dbus.Boolean(True),
 | |
|                                dbus_interface=WPAS_DBUS_OLD_IFACE)
 | |
|     logger.debug("capabilities(strict): " + str(res2))
 | |
|     res = if_obj.state(dbus_interface=WPAS_DBUS_OLD_IFACE)
 | |
|     logger.debug("State: " + res)
 | |
| 
 | |
|     res = if_obj.scanning(dbus_interface=WPAS_DBUS_OLD_IFACE)
 | |
|     if res != 0:
 | |
|         raise Exception("Unexpected scanning: " + str(res))
 | |
| 
 | |
|     if_obj.setAPScan(dbus.UInt32(1), dbus_interface=WPAS_DBUS_OLD_IFACE)
 | |
| 
 | |
|     for t in [ dbus.UInt32(123), "foo" ]:
 | |
|         try:
 | |
|             if_obj.setAPScan(t, dbus_interface=WPAS_DBUS_OLD_IFACE)
 | |
|             raise Exception("Invalid setAPScan() accepted")
 | |
|         except dbus.exceptions.DBusException, e:
 | |
|             if "InvalidOptions" not in str(e):
 | |
|                 raise Exception("Unexpected error message for invalid setAPScan: " + str(e))
 | |
| 
 | |
|     for p in [ path + "/Networks/12345",
 | |
|                path + "/Networks/foo" ]:
 | |
|         obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, p)
 | |
|         try:
 | |
|             obj.disable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
 | |
|             raise Exception("Invalid disable() accepted")
 | |
|         except dbus.exceptions.DBusException, e:
 | |
|             if "InvalidNetwork" not in str(e):
 | |
|                 raise Exception("Unexpected error message for invalid disable: " + str(e))
 | |
| 
 | |
|     for p in [ path + "/BSSIDs/foo",
 | |
|                path + "/BSSIDs/001122334455"]:
 | |
|         obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, p)
 | |
|         try:
 | |
|             obj.properties(dbus_interface=WPAS_DBUS_OLD_BSSID)
 | |
|             raise Exception("Invalid properties() accepted")
 | |
|         except dbus.exceptions.DBusException, e:
 | |
|             if "InvalidBSSID" not in str(e):
 | |
|                 raise Exception("Unexpected error message for invalid properties: " + str(e))
 | |
| 
 | |
| def test_dbus_old_scan(dev, apdev):
 | |
|     """The old D-Bus interface - scanning"""
 | |
|     (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
 | |
| 
 | |
|     hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open" })
 | |
| 
 | |
|     params = hostapd.wpa2_eap_params(ssid="test-wpa2-eap")
 | |
|     params['wpa'] = '3'
 | |
|     hapd2 = hostapd.add_ap(apdev[1]['ifname'], params)
 | |
| 
 | |
|     class TestDbusScan(TestDbus):
 | |
|         def __init__(self, bus):
 | |
|             TestDbus.__init__(self, bus)
 | |
|             self.scan_completed = False
 | |
| 
 | |
|         def __enter__(self):
 | |
|             gobject.timeout_add(1, self.run_scan)
 | |
|             gobject.timeout_add(7000, self.timeout)
 | |
|             self.add_signal(self.scanDone, WPAS_DBUS_OLD_IFACE,
 | |
|                             "ScanResultsAvailable")
 | |
|             self.loop.run()
 | |
|             return self
 | |
| 
 | |
|         def scanDone(self):
 | |
|             logger.debug("scanDone")
 | |
|             self.scan_completed = True
 | |
|             self.loop.quit()
 | |
| 
 | |
|         def run_scan(self, *args):
 | |
|             logger.debug("run_scan")
 | |
|             if not if_obj.scan(dbus_interface=WPAS_DBUS_OLD_IFACE):
 | |
|                 raise Exception("Failed to trigger scan")
 | |
|             return False
 | |
| 
 | |
|         def success(self):
 | |
|             return self.scan_completed
 | |
| 
 | |
|     with TestDbusScan(bus) as t:
 | |
|         if not t.success():
 | |
|             raise Exception("Expected signals not seen")
 | |
| 
 | |
|     res = if_obj.scanResults(dbus_interface=WPAS_DBUS_OLD_IFACE)
 | |
|     if len(res) != 2:
 | |
|         raise Exception("Unexpected number of scan results: " + str(res))
 | |
|     for i in range(2):
 | |
|         logger.debug("Scan result BSS path: " + res[i])
 | |
|         bss_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, res[i])
 | |
|         bss = bss_obj.properties(dbus_interface=WPAS_DBUS_OLD_BSSID,
 | |
|                                  byte_arrays=True)
 | |
|         logger.debug("BSS: " + str(bss))
 | |
| 
 | |
|     obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, res[0])
 | |
|     try:
 | |
|         bss_obj.properties2(dbus_interface=WPAS_DBUS_OLD_BSSID)
 | |
|         raise Exception("Unknown BSSID method accepted")
 | |
|     except Exception, e:
 | |
|         logger.debug("Unknown BSSID method exception: " + str(e))
 | |
| 
 | |
|     if not if_obj.flush(0, dbus_interface=WPAS_DBUS_OLD_IFACE):
 | |
|         raise Exception("Failed to issue flush(0)")
 | |
|     res = if_obj.scanResults(dbus_interface=WPAS_DBUS_OLD_IFACE)
 | |
|     if len(res) != 0:
 | |
|         raise Exception("Unexpected BSS entry after flush")
 | |
|     if not if_obj.flush(1, dbus_interface=WPAS_DBUS_OLD_IFACE):
 | |
|         raise Exception("Failed to issue flush(1)")
 | |
|     try:
 | |
|         if_obj.flush("foo", dbus_interface=WPAS_DBUS_OLD_IFACE)
 | |
|         raise Exception("Invalid flush arguments accepted")
 | |
|     except dbus.exceptions.DBusException, e:
 | |
|         if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
 | |
|             raise Exception("Unexpected error message for invalid flush: " + str(e))
 | |
|     try:
 | |
|         bss_obj.properties(dbus_interface=WPAS_DBUS_OLD_BSSID,
 | |
|                            byte_arrays=True)
 | |
|     except dbus.exceptions.DBusException, e:
 | |
|         if not str(e).startswith("fi.epitest.hostap.WPASupplicant.Interface.InvalidBSSID"):
 | |
|             raise Exception("Unexpected error message for invalid BSS: " + str(e))
 | |
| 
 | |
| def test_dbus_old_debug(dev, apdev):
 | |
|     """The old D-Bus interface - debug"""
 | |
|     (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
 | |
|     wpas = dbus.Interface(wpas_obj, WPAS_DBUS_OLD_SERVICE)
 | |
| 
 | |
|     try:
 | |
|         wpas.setDebugParams(123)
 | |
|         raise Exception("Invalid setDebugParams accepted")
 | |
|     except dbus.exceptions.DBusException, e:
 | |
|         if "InvalidOptions" not in str(e):
 | |
|             raise Exception("Unexpected error message for invalid setDebugParam: " + str(e))
 | |
| 
 | |
|     try:
 | |
|         wpas.setDebugParams(123, True, True)
 | |
|         raise Exception("Invalid setDebugParams accepted")
 | |
|     except dbus.exceptions.DBusException, e:
 | |
|         if "InvalidOptions" not in str(e):
 | |
|             raise Exception("Unexpected error message for invalid setDebugParam: " + str(e))
 | |
| 
 | |
|     wpas.setDebugParams(1, True, True)
 | |
|     dev[0].request("LOG_LEVEL MSGDUMP")
 | |
| 
 | |
| def test_dbus_old_smartcard(dev, apdev):
 | |
|     """The old D-Bus interface - smartcard"""
 | |
|     (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
 | |
| 
 | |
|     params = dbus.Dictionary(signature='sv')
 | |
|     if_obj.setSmartcardModules(params, dbus_interface=WPAS_DBUS_OLD_IFACE)
 | |
| 
 | |
|     params = dbus.Dictionary({ 'opensc_engine_path': "foobar1",
 | |
|                                'pkcs11_engine_path': "foobar2",
 | |
|                                'pkcs11_module_path': "foobar3",
 | |
|                                'foo': 'bar' },
 | |
|                              signature='sv')
 | |
|     params2 = dbus.Dictionary({ 'pkcs11_engine_path': "foobar2",
 | |
|                                 'foo': 'bar' },
 | |
|                               signature='sv')
 | |
|     params3 = dbus.Dictionary({ 'pkcs11_module_path': "foobar3",
 | |
|                                 'foo2': 'bar' },
 | |
|                               signature='sv')
 | |
|     params4 = dbus.Dictionary({ 'opensc_engine_path': "foobar4",
 | |
|                                 'foo3': 'bar' },
 | |
|                               signature='sv')
 | |
|     tests = [ 1, params, params2, params3, params4 ]
 | |
|     for t in tests:
 | |
|         try:
 | |
|             if_obj.setSmartcardModules(t, dbus_interface=WPAS_DBUS_OLD_IFACE)
 | |
|             raise Exception("Invalid setSmartcardModules accepted: " + str(t))
 | |
|         except dbus.exceptions.DBusException, e:
 | |
|             if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
 | |
|                 raise Exception("Unexpected error message for invalid setSmartcardModules(%s): %s" % (str(t), str(e)))
 | |
| 
 | |
| def test_dbus_old_interface(dev, apdev):
 | |
|     """The old D-Bus interface - interface get/add/remove"""
 | |
|     (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
 | |
|     wpas = dbus.Interface(wpas_obj, WPAS_DBUS_OLD_SERVICE)
 | |
| 
 | |
|     tests = [ (123, "InvalidOptions"),
 | |
|               ("foo", "InvalidInterface") ]
 | |
|     for (ifname,err) in tests:
 | |
|         try:
 | |
|             wpas.getInterface(ifname)
 | |
|             raise Exception("Invalid getInterface accepted")
 | |
|         except dbus.exceptions.DBusException, e:
 | |
|             if err not in str(e):
 | |
|                 raise Exception("Unexpected error message for invalid getInterface: " + str(e))
 | |
| 
 | |
|     params = dbus.Dictionary({ 'driver': 'none' }, signature='sv')
 | |
|     wpas.addInterface("lo", params)
 | |
|     path = wpas.getInterface("lo")
 | |
|     logger.debug("New interface path: " + str(path))
 | |
|     wpas.removeInterface(path)
 | |
|     try:
 | |
|         wpas.removeInterface(path)
 | |
|         raise Exception("Invalid removeInterface() accepted")
 | |
|     except dbus.exceptions.DBusException, e:
 | |
|         if "InvalidInterface" not in str(e):
 | |
|             raise Exception("Unexpected error message for invalid removeInterface: " + str(e))
 | |
| 
 | |
|     params1 = dbus.Dictionary({ 'driver': 'foo',
 | |
|                                 'driver-params': 'foo',
 | |
|                                 'config-file': 'foo',
 | |
|                                 'bridge-ifname': 'foo' },
 | |
|                               signature='sv')
 | |
|     params2 = dbus.Dictionary({ 'foo': 'bar' }, signature='sv')
 | |
|     tests = [ (123, None, "InvalidOptions"),
 | |
|               ("", None, "InvalidOptions"),
 | |
|               ("foo", None, "AddError"),
 | |
|               ("foo", params1, "AddError"),
 | |
|               ("foo", params2, "InvalidOptions"),
 | |
|               ("foo", 1234, "InvalidOptions"),
 | |
|               (dev[0].ifname, None, "ExistsError" ) ]
 | |
|     for (ifname,params,err) in tests:
 | |
|         try:
 | |
|             if params is None:
 | |
|                 wpas.addInterface(ifname)
 | |
|             else:
 | |
|                 wpas.addInterface(ifname, params)
 | |
|             raise Exception("Invalid addInterface accepted: " + str(params))
 | |
|         except dbus.exceptions.DBusException, e:
 | |
|             if err not in str(e):
 | |
|                 raise Exception("Unexpected error message for invalid addInterface(%s): %s" % (str(params), str(e)))
 | |
| 
 | |
|     try:
 | |
|         wpas.removeInterface(123)
 | |
|         raise Exception("Invalid removeInterface accepted")
 | |
|     except dbus.exceptions.DBusException, e:
 | |
|         if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
 | |
|             raise Exception("Unexpected error message for invalid removeInterface: " + str(e))
 | |
| 
 | |
| def test_dbus_old_blob(dev, apdev):
 | |
|     """The old D-Bus interface - blob operations"""
 | |
|     (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
 | |
| 
 | |
|     param1 = dbus.Dictionary({ 'blob3': 123 }, signature='sv')
 | |
|     param2 = dbus.Dictionary({ 'blob3': "foo" })
 | |
|     param3 = dbus.Dictionary({ '': dbus.ByteArray([ 1, 2 ]) },
 | |
|                              signature='sv')
 | |
|     tests = [ (1, "InvalidOptions"),
 | |
|               (param1, "InvalidOptions"),
 | |
|               (param2, "InvalidOptions"),
 | |
|               (param3, "InvalidOptions") ]
 | |
|     for (arg,err) in tests:
 | |
|         try:
 | |
|             if_obj.setBlobs(arg, dbus_interface=WPAS_DBUS_OLD_IFACE)
 | |
|             raise Exception("Invalid setBlobs() accepted: " + str(arg))
 | |
|         except dbus.exceptions.DBusException, e:
 | |
|             logger.debug("setBlobs(%s): %s" % (str(arg), str(e)))
 | |
|             if err not in str(e):
 | |
|                 raise Exception("Unexpected error message for invalid setBlobs: " + str(e))
 | |
| 
 | |
|     tests = [ (["foo"], "RemoveError: Error removing blob"),
 | |
|               ([""], "RemoveError: Invalid blob name"),
 | |
|               ([1], "InvalidOptions"),
 | |
|               ("foo", "InvalidOptions") ]
 | |
|     for (arg,err) in tests:
 | |
|         try:
 | |
|             if_obj.removeBlobs(arg, dbus_interface=WPAS_DBUS_OLD_IFACE)
 | |
|             raise Exception("Invalid removeBlobs() accepted: " + str(arg))
 | |
|         except dbus.exceptions.DBusException, e:
 | |
|             logger.debug("removeBlobs(%s): %s" % (str(arg), str(e)))
 | |
|             if err not in str(e):
 | |
|                 raise Exception("Unexpected error message for invalid removeBlobs: " + str(e))
 | |
| 
 | |
|     blobs = dbus.Dictionary({ 'blob1': dbus.ByteArray([ 1, 2, 3 ]),
 | |
|                               'blob2': dbus.ByteArray([ 1, 2 ]) },
 | |
|                             signature='sv')
 | |
|     if_obj.setBlobs(blobs, dbus_interface=WPAS_DBUS_OLD_IFACE)
 | |
|     if_obj.removeBlobs(['blob1', 'blob2'], dbus_interface=WPAS_DBUS_OLD_IFACE)
 | |
| 
 | |
| def test_dbus_old_connect(dev, apdev):
 | |
|     """The old D-Bus interface - add a network and connect"""
 | |
|     (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
 | |
| 
 | |
|     ssid = "test-wpa2-psk"
 | |
|     passphrase = 'qwertyuiop'
 | |
|     params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
 | |
|     hapd = hostapd.add_ap(apdev[0]['ifname'], params)
 | |
| 
 | |
|     for p in [ "/no/where/to/be/found",
 | |
|                path + "/Networks/12345",
 | |
|                path + "/Networks/foo",
 | |
|                "/fi/epitest/hostap/WPASupplicant/Interfaces",
 | |
|                "/fi/epitest/hostap/WPASupplicant/Interfaces/12345/Networks/0" ]:
 | |
|         obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, p)
 | |
|         try:
 | |
|             if_obj.removeNetwork(obj, dbus_interface=WPAS_DBUS_OLD_IFACE)
 | |
|             raise Exception("Invalid removeNetwork accepted: " + p)
 | |
|         except dbus.exceptions.DBusException, e:
 | |
|             if not str(e).startswith("fi.epitest.hostap.WPASupplicant.Interface.InvalidNetwork"):
 | |
|                 raise Exception("Unexpected error message for invalid removeNetwork: " + str(e))
 | |
| 
 | |
|     try:
 | |
|         if_obj.removeNetwork("foo", dbus_interface=WPAS_DBUS_OLD_IFACE)
 | |
|         raise Exception("Invalid removeNetwork accepted")
 | |
|     except dbus.exceptions.DBusException, e:
 | |
|         if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
 | |
|             raise Exception("Unexpected error message for invalid removeNetwork: " + str(e))
 | |
| 
 | |
|     try:
 | |
|         if_obj.removeNetwork(path, dbus_interface=WPAS_DBUS_OLD_IFACE)
 | |
|         raise Exception("Invalid removeNetwork accepted")
 | |
|     except dbus.exceptions.DBusException, e:
 | |
|         if not str(e).startswith("fi.epitest.hostap.WPASupplicant.Interface.InvalidNetwork"):
 | |
|             raise Exception("Unexpected error message for invalid removeNetwork: " + str(e))
 | |
| 
 | |
|     tests = [ (path, "InvalidNetwork"),
 | |
|               (bus.get_object(WPAS_DBUS_OLD_SERVICE, "/no/where"),
 | |
|                "InvalidInterface"),
 | |
|               (bus.get_object(WPAS_DBUS_OLD_SERVICE, path + "/Networks/1234"),
 | |
|                "InvalidNetwork"),
 | |
|               (bus.get_object(WPAS_DBUS_OLD_SERVICE, path + "/Networks/foo"),
 | |
|                "InvalidNetwork"),
 | |
|               (1, "InvalidOptions") ]
 | |
|     for t,err in tests:
 | |
|         try:
 | |
|             if_obj.selectNetwork(t, dbus_interface=WPAS_DBUS_OLD_IFACE)
 | |
|             raise Exception("Invalid selectNetwork accepted: " + str(t))
 | |
|         except dbus.exceptions.DBusException, e:
 | |
|             if err not in str(e):
 | |
|                 raise Exception("Unexpected error message for invalid selectNetwork(%s): %s" % (str(t), str(e)))
 | |
| 
 | |
|     npath = if_obj.addNetwork(dbus_interface=WPAS_DBUS_OLD_IFACE)
 | |
|     if not npath.startswith(WPAS_DBUS_OLD_PATH):
 | |
|         raise Exception("Unexpected addNetwork result: " + path)
 | |
|     netw_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, npath)
 | |
|     tests = [ 123,
 | |
|               dbus.Dictionary({ 'foo': 'bar' }, signature='sv') ]
 | |
|     for t in tests:
 | |
|         try:
 | |
|             netw_obj.set(t, dbus_interface=WPAS_DBUS_OLD_NETWORK)
 | |
|             raise Exception("Invalid set() accepted: " + str(t))
 | |
|         except dbus.exceptions.DBusException, e:
 | |
|             if "InvalidOptions" not in str(e):
 | |
|                 raise Exception("Unexpected error message for invalid set: " + str(e))
 | |
|     params = dbus.Dictionary({ 'ssid': ssid,
 | |
|                                'key_mgmt': 'WPA-PSK',
 | |
|                                'psk': passphrase,
 | |
|                                'identity': dbus.ByteArray([ 1, 2 ]),
 | |
|                                'priority': dbus.Int32(0),
 | |
|                                'scan_freq': dbus.UInt32(2412) },
 | |
|                              signature='sv')
 | |
|     netw_obj.set(params, dbus_interface=WPAS_DBUS_OLD_NETWORK)
 | |
|     if_obj.removeNetwork(npath, dbus_interface=WPAS_DBUS_OLD_IFACE)
 | |
| 
 | |
|     class TestDbusConnect(TestDbus):
 | |
|         def __init__(self, bus):
 | |
|             TestDbus.__init__(self, bus)
 | |
|             self.state = 0
 | |
| 
 | |
|         def __enter__(self):
 | |
|             gobject.timeout_add(1, self.run_connect)
 | |
|             gobject.timeout_add(15000, self.timeout)
 | |
|             self.add_signal(self.scanDone, WPAS_DBUS_OLD_IFACE,
 | |
|                             "ScanResultsAvailable")
 | |
|             self.add_signal(self.stateChange, WPAS_DBUS_OLD_IFACE,
 | |
|                             "StateChange")
 | |
|             self.loop.run()
 | |
|             return self
 | |
| 
 | |
|         def scanDone(self):
 | |
|             logger.debug("scanDone")
 | |
| 
 | |
|         def stateChange(self, new, old):
 | |
|             logger.debug("stateChange(%d): %s --> %s" % (self.state, old, new))
 | |
|             if new == "COMPLETED":
 | |
|                 if self.state == 0:
 | |
|                     self.state = 1
 | |
|                     self.netw_obj.disable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
 | |
|                 elif self.state == 2:
 | |
|                     self.state = 3
 | |
|                     if_obj.disconnect(dbus_interface=WPAS_DBUS_OLD_IFACE)
 | |
|                 elif self.state == 4:
 | |
|                     self.state = 5
 | |
|                     if_obj.disconnect(dbus_interface=WPAS_DBUS_OLD_IFACE)
 | |
|                 elif self.state == 6:
 | |
|                     self.state = 7
 | |
|                     if_obj.removeNetwork(self.path,
 | |
|                                          dbus_interface=WPAS_DBUS_OLD_IFACE)
 | |
|                     try:
 | |
|                         if_obj.removeNetwork(self.path,
 | |
|                                              dbus_interface=WPAS_DBUS_OLD_IFACE)
 | |
|                         raise Exception("Invalid removeNetwork accepted")
 | |
|                     except dbus.exceptions.DBusException, e:
 | |
|                         if not str(e).startswith("fi.epitest.hostap.WPASupplicant.Interface.InvalidNetwork"):
 | |
|                             raise Exception("Unexpected error message for invalid wpsPbc: " + str(e))
 | |
| 
 | |
|                     self.loop.quit()
 | |
|             elif new == "DISCONNECTED":
 | |
|                 if self.state == 1:
 | |
|                     self.state = 2
 | |
|                     self.netw_obj.enable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
 | |
|                 elif self.state == 3:
 | |
|                     self.state = 4
 | |
|                     if_obj.selectNetwork(dbus_interface=WPAS_DBUS_OLD_IFACE)
 | |
|                 elif self.state == 5:
 | |
|                     self.state = 6
 | |
|                     if_obj.selectNetwork(self.path,
 | |
|                                          dbus_interface=WPAS_DBUS_OLD_IFACE)
 | |
| 
 | |
|         def run_connect(self, *args):
 | |
|             logger.debug("run_connect")
 | |
|             path = if_obj.addNetwork(dbus_interface=WPAS_DBUS_OLD_IFACE)
 | |
|             netw_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, path)
 | |
|             netw_obj.disable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
 | |
|             params = dbus.Dictionary({ 'ssid': ssid,
 | |
|                                        'key_mgmt': 'WPA-PSK',
 | |
|                                        'psk': passphrase,
 | |
|                                        'scan_freq': 2412 },
 | |
|                                      signature='sv')
 | |
|             netw_obj.set(params, dbus_interface=WPAS_DBUS_OLD_NETWORK)
 | |
|             netw_obj.enable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
 | |
|             self.path = path
 | |
|             self.netw_obj = netw_obj
 | |
|             return False
 | |
| 
 | |
|         def success(self):
 | |
|             return self.state == 7
 | |
| 
 | |
|     with TestDbusConnect(bus) as t:
 | |
|         if not t.success():
 | |
|             raise Exception("Expected signals not seen")
 | |
| 
 | |
|     if len(dev[0].list_networks()) != 0:
 | |
|         raise Exception("Unexpected network")
 | |
| 
 | |
| def test_dbus_old_connect_eap(dev, apdev):
 | |
|     """The old D-Bus interface - add an EAP network and connect"""
 | |
|     (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
 | |
| 
 | |
|     ssid = "test-wpa2-eap"
 | |
|     params = hostapd.wpa2_eap_params(ssid=ssid)
 | |
|     hapd = hostapd.add_ap(apdev[0]['ifname'], params)
 | |
| 
 | |
|     class TestDbusConnect(TestDbus):
 | |
|         def __init__(self, bus):
 | |
|             TestDbus.__init__(self, bus)
 | |
|             self.connected = False
 | |
|             self.certification_received = False
 | |
| 
 | |
|         def __enter__(self):
 | |
|             gobject.timeout_add(1, self.run_connect)
 | |
|             gobject.timeout_add(15000, self.timeout)
 | |
|             self.add_signal(self.stateChange, WPAS_DBUS_OLD_IFACE,
 | |
|                             "StateChange")
 | |
|             self.add_signal(self.certification, WPAS_DBUS_OLD_IFACE,
 | |
|                             "Certification")
 | |
|             self.loop.run()
 | |
|             return self
 | |
| 
 | |
|         def stateChange(self, new, old):
 | |
|             logger.debug("stateChange: %s --> %s" % (old, new))
 | |
|             if new == "COMPLETED":
 | |
|                 self.connected = True
 | |
|                 self.loop.quit()
 | |
| 
 | |
|         def certification(self, depth, subject, hash, cert_hex):
 | |
|             logger.debug("certification: depth={} subject={} hash={} cert_hex={}".format(depth, subject, hash, cert_hex))
 | |
|             self.certification_received = True
 | |
| 
 | |
|         def run_connect(self, *args):
 | |
|             logger.debug("run_connect")
 | |
|             path = if_obj.addNetwork(dbus_interface=WPAS_DBUS_OLD_IFACE)
 | |
|             netw_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, path)
 | |
|             params = dbus.Dictionary({ 'ssid': ssid,
 | |
|                                        'key_mgmt': 'WPA-EAP',
 | |
|                                        'eap': 'TTLS',
 | |
|                                        'anonymous_identity': 'ttls',
 | |
|                                        'identity': 'pap user',
 | |
|                                        'ca_cert': 'auth_serv/ca.pem',
 | |
|                                        'phase2': 'auth=PAP',
 | |
|                                        'password': 'password',
 | |
|                                        'scan_freq': 2412 },
 | |
|                                      signature='sv')
 | |
|             netw_obj.set(params, dbus_interface=WPAS_DBUS_OLD_NETWORK)
 | |
|             netw_obj.enable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
 | |
|             self.path = path
 | |
|             self.netw_obj = netw_obj
 | |
|             return False
 | |
| 
 | |
|         def success(self):
 | |
|             return self.connected and self.certification_received
 | |
| 
 | |
|     with TestDbusConnect(bus) as t:
 | |
|         if not t.success():
 | |
|             raise Exception("Expected signals not seen")
 | |
| 
 | |
| def test_dbus_old_wps_pbc(dev, apdev):
 | |
|     """The old D-Bus interface and WPS/PBC"""
 | |
|     try:
 | |
|         return _test_dbus_old_wps_pbc(dev, apdev)
 | |
|     finally:
 | |
|         dev[0].request("SET wps_cred_processing 0")
 | |
| 
 | |
| def _test_dbus_old_wps_pbc(dev, apdev):
 | |
|     (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
 | |
| 
 | |
|     hapd = start_ap(apdev[0])
 | |
|     hapd.request("WPS_PBC")
 | |
|     bssid = apdev[0]['bssid']
 | |
|     dev[0].scan_for_bss(bssid, freq="2412")
 | |
|     dev[0].request("SET wps_cred_processing 2")
 | |
| 
 | |
|     for arg in [ 123, "123" ]:
 | |
|         try:
 | |
|             if_obj.wpsPbc(arg, dbus_interface=WPAS_DBUS_OLD_IFACE)
 | |
|             raise Exception("Invalid wpsPbc arguments accepted: " + str(arg))
 | |
|         except dbus.exceptions.DBusException, e:
 | |
|             if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
 | |
|                 raise Exception("Unexpected error message for invalid wpsPbc: " + str(e))
 | |
| 
 | |
|     class TestDbusWps(TestDbusOldWps):
 | |
|         def __init__(self, bus, pbc_param):
 | |
|             TestDbusOldWps.__init__(self, bus)
 | |
|             self.pbc_param = pbc_param
 | |
| 
 | |
|         def run_wps(self, *args):
 | |
|             logger.debug("run_wps: pbc_param=" + self.pbc_param)
 | |
|             if_obj.wpsPbc(self.pbc_param, dbus_interface=WPAS_DBUS_OLD_IFACE)
 | |
|             return False
 | |
| 
 | |
|     with TestDbusWps(bus, "any") as t:
 | |
|         if not t.success():
 | |
|             raise Exception("Expected signals not seen")
 | |
| 
 | |
|     res = if_obj.scanResults(dbus_interface=WPAS_DBUS_OLD_IFACE)
 | |
|     if len(res) != 1:
 | |
|         raise Exception("Unexpected number of scan results: " + str(res))
 | |
|     for i in range(1):
 | |
|         logger.debug("Scan result BSS path: " + res[i])
 | |
|         bss_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, res[i])
 | |
|         bss = bss_obj.properties(dbus_interface=WPAS_DBUS_OLD_BSSID,
 | |
|                                  byte_arrays=True)
 | |
|         logger.debug("BSS: " + str(bss))
 | |
| 
 | |
|     dev[0].wait_connected(timeout=10)
 | |
|     dev[0].request("DISCONNECT")
 | |
|     dev[0].wait_disconnected(timeout=10)
 | |
|     dev[0].request("FLUSH")
 | |
| 
 | |
|     hapd.request("WPS_PBC")
 | |
|     dev[0].scan_for_bss(bssid, freq="2412")
 | |
| 
 | |
|     with TestDbusWps(bus, bssid) as t:
 | |
|         if not t.success():
 | |
|             raise Exception("Expected signals not seen")
 | |
| 
 | |
|     dev[0].wait_connected(timeout=10)
 | |
|     dev[0].request("DISCONNECT")
 | |
|     dev[0].wait_disconnected(timeout=10)
 | |
| 
 | |
|     hapd.disable()
 | |
|     dev[0].flush_scan_cache()
 | |
| 
 | |
| def test_dbus_old_wps_pin(dev, apdev):
 | |
|     """The old D-Bus interface and WPS/PIN"""
 | |
|     try:
 | |
|         return _test_dbus_old_wps_pin(dev, apdev)
 | |
|     finally:
 | |
|         dev[0].request("SET wps_cred_processing 0")
 | |
| 
 | |
| def _test_dbus_old_wps_pin(dev, apdev):
 | |
|     (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
 | |
| 
 | |
|     hapd = start_ap(apdev[0])
 | |
|     hapd.request("WPS_PIN any 12345670")
 | |
|     bssid = apdev[0]['bssid']
 | |
|     dev[0].scan_for_bss(bssid, freq="2412")
 | |
|     dev[0].request("SET wps_cred_processing 2")
 | |
| 
 | |
|     for arg in [ (123, "12345670"),
 | |
|                  ("123", "12345670") ]:
 | |
|         try:
 | |
|             if_obj.wpsPin(arg[0], arg[1], dbus_interface=WPAS_DBUS_OLD_IFACE)
 | |
|             raise Exception("Invalid wpsPin arguments accepted: " + str(arg))
 | |
|         except dbus.exceptions.DBusException, e:
 | |
|             if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
 | |
|                 raise Exception("Unexpected error message for invalid wpsPbc: " + str(e))
 | |
| 
 | |
|     class TestDbusWps(TestDbusOldWps):
 | |
|         def __init__(self, bus, bssid, pin):
 | |
|             TestDbusOldWps.__init__(self, bus)
 | |
|             self.bssid = bssid
 | |
|             self.pin = pin
 | |
| 
 | |
|         def run_wps(self, *args):
 | |
|             logger.debug("run_wps %s %s" % (self.bssid, self.pin))
 | |
|             pin = if_obj.wpsPin(self.bssid, self.pin,
 | |
|                                 dbus_interface=WPAS_DBUS_OLD_IFACE)
 | |
|             if len(self.pin) == 0:
 | |
|                 h = hostapd.Hostapd(apdev[0]['ifname'])
 | |
|                 h.request("WPS_PIN any " + pin)
 | |
|             return False
 | |
| 
 | |
|     with TestDbusWps(bus, bssid, "12345670") as t:
 | |
|         if not t.success():
 | |
|             raise Exception("Expected signals not seen")
 | |
| 
 | |
|     dev[0].wait_connected(timeout=10)
 | |
|     dev[0].request("DISCONNECT")
 | |
|     dev[0].wait_disconnected(timeout=10)
 | |
|     dev[0].request("FLUSH")
 | |
| 
 | |
|     dev[0].scan_for_bss(bssid, freq="2412")
 | |
| 
 | |
|     with TestDbusWps(bus, "any", "") as t:
 | |
|         if not t.success():
 | |
|             raise Exception("Expected signals not seen")
 | |
| 
 | |
| def test_dbus_old_wps_reg(dev, apdev):
 | |
|     """The old D-Bus interface and WPS/Registar"""
 | |
|     try:
 | |
|         return _test_dbus_old_wps_reg(dev, apdev)
 | |
|     finally:
 | |
|         dev[0].request("SET wps_cred_processing 0")
 | |
| 
 | |
| def _test_dbus_old_wps_reg(dev, apdev):
 | |
|     (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
 | |
| 
 | |
|     hapd = start_ap(apdev[0])
 | |
|     bssid = apdev[0]['bssid']
 | |
|     dev[0].scan_for_bss(bssid, freq="2412")
 | |
|     dev[0].request("SET wps_cred_processing 2")
 | |
| 
 | |
|     for arg in [ (123, "12345670"),
 | |
|                  ("123", "12345670") ]:
 | |
|         try:
 | |
|             if_obj.wpsReg(arg[0], arg[1], dbus_interface=WPAS_DBUS_OLD_IFACE)
 | |
|             raise Exception("Invalid wpsReg arguments accepted: " + str(arg))
 | |
|         except dbus.exceptions.DBusException, e:
 | |
|             if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
 | |
|                 raise Exception("Unexpected error message for invalid wpsPbc: " + str(e))
 | |
| 
 | |
|     class TestDbusWps(TestDbusOldWps):
 | |
|         def run_wps(self, *args):
 | |
|             logger.debug("run_wps")
 | |
|             if_obj.wpsReg(bssid, "12345670", dbus_interface=WPAS_DBUS_OLD_IFACE)
 | |
|             return False
 | |
| 
 | |
|     with TestDbusWps(bus) as t:
 | |
|         if not t.success():
 | |
|             raise Exception("Expected signals not seen")
 | |
| 
 | |
|     dev[0].wait_connected(timeout=10)
 |