tests: D-Bus PropertiesChanged events on P2P Peer.Groups
Verify that Groups list for a P2P Peer gets updated properly on group addition and removal (three different paths). Signed-off-by: Jouni Malinen <j@w1.fi>
This commit is contained in:
parent
9f59fe8dc8
commit
20fd8defa0
1 changed files with 206 additions and 2 deletions
|
@ -4086,6 +4086,8 @@ def test_dbus_p2p_go_neg_init(dev, apdev):
|
|||
def __init__(self, bus):
|
||||
TestDbus.__init__(self, bus)
|
||||
self.done = False
|
||||
self.peer_group_added = False
|
||||
self.peer_group_removed = False
|
||||
|
||||
def __enter__(self):
|
||||
gobject.timeout_add(1, self.run_test)
|
||||
|
@ -4100,6 +4102,8 @@ def test_dbus_p2p_go_neg_init(dev, apdev):
|
|||
"GroupStarted")
|
||||
self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
|
||||
"GroupFinished")
|
||||
self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
|
||||
"PropertiesChanged")
|
||||
self.loop.run()
|
||||
return self
|
||||
|
||||
|
@ -4135,7 +4139,19 @@ def test_dbus_p2p_go_neg_init(dev, apdev):
|
|||
def groupFinished(self, properties):
|
||||
logger.debug("groupFinished: " + str(properties))
|
||||
self.done = True
|
||||
self.loop.quit()
|
||||
|
||||
def propertiesChanged(self, interface_name, changed_properties,
|
||||
invalidated_properties):
|
||||
logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
|
||||
if interface_name != WPAS_DBUS_P2P_PEER:
|
||||
return
|
||||
if "Groups" not in changed_properties:
|
||||
return
|
||||
if len(changed_properties["Groups"]) > 0:
|
||||
self.peer_group_added = True
|
||||
if len(changed_properties["Groups"]) == 0:
|
||||
self.peer_group_removed = True
|
||||
self.loop.quit()
|
||||
|
||||
def run_test(self, *args):
|
||||
logger.debug("run_test")
|
||||
|
@ -4143,7 +4159,195 @@ def test_dbus_p2p_go_neg_init(dev, apdev):
|
|||
return False
|
||||
|
||||
def success(self):
|
||||
return self.done
|
||||
return self.done and self.peer_group_added and self.peer_group_removed
|
||||
|
||||
with TestDbusP2p(bus) as t:
|
||||
if not t.success():
|
||||
raise Exception("Expected signals not seen")
|
||||
|
||||
def test_dbus_p2p_group_termination_by_go(dev, apdev):
|
||||
"""D-Bus P2P group removal on GO terminating the group"""
|
||||
(bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
|
||||
p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
|
||||
addr0 = dev[0].p2p_dev_addr()
|
||||
dev[1].p2p_listen()
|
||||
|
||||
class TestDbusP2p(TestDbus):
|
||||
def __init__(self, bus):
|
||||
TestDbus.__init__(self, bus)
|
||||
self.done = False
|
||||
self.peer_group_added = False
|
||||
self.peer_group_removed = False
|
||||
|
||||
def __enter__(self):
|
||||
gobject.timeout_add(1, self.run_test)
|
||||
gobject.timeout_add(15000, self.timeout)
|
||||
self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
|
||||
"DeviceFound")
|
||||
self.add_signal(self.goNegotiationSuccess,
|
||||
WPAS_DBUS_IFACE_P2PDEVICE,
|
||||
"GONegotiationSuccess",
|
||||
byte_arrays=True)
|
||||
self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
|
||||
"GroupStarted")
|
||||
self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
|
||||
"GroupFinished")
|
||||
self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
|
||||
"PropertiesChanged")
|
||||
self.loop.run()
|
||||
return self
|
||||
|
||||
def deviceFound(self, path):
|
||||
logger.debug("deviceFound: path=%s" % path)
|
||||
dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
|
||||
args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
|
||||
'go_intent': 0 }
|
||||
p2p.Connect(args)
|
||||
|
||||
ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15)
|
||||
if ev is None:
|
||||
raise Exception("Timeout while waiting for GO Neg Request")
|
||||
dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
|
||||
ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15);
|
||||
if ev is None:
|
||||
raise Exception("Group formation timed out")
|
||||
self.sta_group_ev = ev
|
||||
|
||||
def goNegotiationSuccess(self, properties):
|
||||
logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
|
||||
|
||||
def groupStarted(self, properties):
|
||||
logger.debug("groupStarted: " + str(properties))
|
||||
g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
|
||||
properties['interface_object'])
|
||||
dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
|
||||
dev1.group_form_result(self.sta_group_ev)
|
||||
dev1.remove_group()
|
||||
|
||||
def groupFinished(self, properties):
|
||||
logger.debug("groupFinished: " + str(properties))
|
||||
self.done = True
|
||||
|
||||
def propertiesChanged(self, interface_name, changed_properties,
|
||||
invalidated_properties):
|
||||
logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
|
||||
if interface_name != WPAS_DBUS_P2P_PEER:
|
||||
return
|
||||
if "Groups" not in changed_properties:
|
||||
return
|
||||
if len(changed_properties["Groups"]) > 0:
|
||||
self.peer_group_added = True
|
||||
if len(changed_properties["Groups"]) == 0:
|
||||
self.peer_group_removed = True
|
||||
self.loop.quit()
|
||||
|
||||
def run_test(self, *args):
|
||||
logger.debug("run_test")
|
||||
p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
|
||||
return False
|
||||
|
||||
def success(self):
|
||||
return self.done and self.peer_group_added and self.peer_group_removed
|
||||
|
||||
with TestDbusP2p(bus) as t:
|
||||
if not t.success():
|
||||
raise Exception("Expected signals not seen")
|
||||
|
||||
def test_dbus_p2p_group_idle_timeout(dev, apdev):
|
||||
"""D-Bus P2P group removal on idle timeout"""
|
||||
try:
|
||||
dev[0].global_request("SET p2p_group_idle 1")
|
||||
_test_dbus_p2p_group_idle_timeout(dev, apdev)
|
||||
finally:
|
||||
dev[0].global_request("SET p2p_group_idle 0")
|
||||
|
||||
def _test_dbus_p2p_group_idle_timeout(dev, apdev):
|
||||
(bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
|
||||
p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
|
||||
addr0 = dev[0].p2p_dev_addr()
|
||||
dev[1].p2p_listen()
|
||||
|
||||
class TestDbusP2p(TestDbus):
|
||||
def __init__(self, bus):
|
||||
TestDbus.__init__(self, bus)
|
||||
self.done = False
|
||||
self.peer_group_added = False
|
||||
self.peer_group_removed = False
|
||||
|
||||
def __enter__(self):
|
||||
gobject.timeout_add(1, self.run_test)
|
||||
gobject.timeout_add(15000, self.timeout)
|
||||
self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
|
||||
"DeviceFound")
|
||||
self.add_signal(self.goNegotiationSuccess,
|
||||
WPAS_DBUS_IFACE_P2PDEVICE,
|
||||
"GONegotiationSuccess",
|
||||
byte_arrays=True)
|
||||
self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
|
||||
"GroupStarted")
|
||||
self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
|
||||
"GroupFinished")
|
||||
self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
|
||||
"PropertiesChanged")
|
||||
self.loop.run()
|
||||
return self
|
||||
|
||||
def deviceFound(self, path):
|
||||
logger.debug("deviceFound: path=%s" % path)
|
||||
dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
|
||||
args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
|
||||
'go_intent': 0 }
|
||||
p2p.Connect(args)
|
||||
|
||||
ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15)
|
||||
if ev is None:
|
||||
raise Exception("Timeout while waiting for GO Neg Request")
|
||||
dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
|
||||
ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15);
|
||||
if ev is None:
|
||||
raise Exception("Group formation timed out")
|
||||
self.sta_group_ev = ev
|
||||
|
||||
def goNegotiationSuccess(self, properties):
|
||||
logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
|
||||
|
||||
def groupStarted(self, properties):
|
||||
logger.debug("groupStarted: " + str(properties))
|
||||
g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
|
||||
properties['interface_object'])
|
||||
dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
|
||||
dev1.group_form_result(self.sta_group_ev)
|
||||
ifaddr = dev1.group_request("STA-FIRST").splitlines()[0]
|
||||
# Force disassociation with different reason code so that the
|
||||
# P2P Client using D-Bus does not get normal group termination event
|
||||
# from the GO.
|
||||
dev1.group_request("DEAUTHENTICATE " + ifaddr + " reason=0 test=0")
|
||||
dev1.remove_group()
|
||||
|
||||
def groupFinished(self, properties):
|
||||
logger.debug("groupFinished: " + str(properties))
|
||||
self.done = True
|
||||
|
||||
def propertiesChanged(self, interface_name, changed_properties,
|
||||
invalidated_properties):
|
||||
logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
|
||||
if interface_name != WPAS_DBUS_P2P_PEER:
|
||||
return
|
||||
if "Groups" not in changed_properties:
|
||||
return
|
||||
if len(changed_properties["Groups"]) > 0:
|
||||
self.peer_group_added = True
|
||||
if len(changed_properties["Groups"]) == 0:
|
||||
self.peer_group_removed = True
|
||||
self.loop.quit()
|
||||
|
||||
def run_test(self, *args):
|
||||
logger.debug("run_test")
|
||||
p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
|
||||
return False
|
||||
|
||||
def success(self):
|
||||
return self.done and self.peer_group_added and self.peer_group_removed
|
||||
|
||||
with TestDbusP2p(bus) as t:
|
||||
if not t.success():
|
||||
|
|
Loading…
Reference in a new issue