hostap/wpa_supplicant/examples/p2p/p2p_invite.py

202 lines
5.1 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/python
# Tests p2p_invite
######### MAY NEED TO RUN AS SUDO #############
import dbus
import sys, os
import time
import gobject
import getopt
import threading
from dbus.mainloop.glib import DBusGMainLoop
def usage():
print("Usage:")
print(" %s -i <interface_name> -a <addr> \ " \
% sys.argv[0])
print(" [-o <persistent_group_object>] [-w <wpas_dbus_interface>]")
print("Options:")
print(" -i = interface name")
print(" -a = address of peer")
print(" -o = persistent group object path")
print(" -w = wpas dbus interface = fi.w1.wpa_supplicant1")
print("Example:")
print(" %s -i p2p-wlan0-0 -a 00150083523c" % sys.argv[0])
# Required Signals
def InvitationResult(invite_result):
print("Invitation Result signal :")
status = invite_result['status']
print("status = ", status)
if invite_result.has_key('BSSID'):
bssid = invite_result['BSSID']
print("BSSID = ", hex(bssid[0]) , ":" , \
hex(bssid[1]) , ":" , hex(bssid[2]) , ":", \
hex(bssid[3]) , ":" , hex(bssid[4]) , ":" , \
hex(bssid[5]))
os._exit(0)
class P2P_Invite (threading.Thread):
# Needed Variables
global bus
global wpas_object
global interface_object
global p2p_interface
global interface_name
global wpas
global wpas_dbus_interface
global path
global addr
global persistent_group_object
# Dbus Paths
global wpas_dbus_opath
global wpas_dbus_interfaces_opath
global wpas_dbus_interfaces_interface
global wpas_dbus_interfaces_p2pdevice
# Arguments
global P2PDictionary
# Constructor
def __init__(self,interface_name,wpas_dbus_interface,addr,
persistent_group_object):
# Initializes variables and threads
self.interface_name = interface_name
self.wpas_dbus_interface = wpas_dbus_interface
self.addr = addr
self.persistent_group_object = persistent_group_object
# Initializes thread and daemon allows for ctrl-c kill
threading.Thread.__init__(self)
self.daemon = True
# Generating interface/object paths
self.wpas_dbus_opath = "/" + \
self.wpas_dbus_interface.replace(".","/")
self.wpas_wpas_dbus_interfaces_opath = self.wpas_dbus_opath + \
"/Interfaces"
self.wpas_dbus_interfaces_interface = \
self.wpas_dbus_interface + ".Interface"
self.wpas_dbus_interfaces_p2pdevice = \
self.wpas_dbus_interfaces_interface \
+ ".P2PDevice"
# Getting interfaces and objects
DBusGMainLoop(set_as_default=True)
self.bus = dbus.SystemBus()
self.wpas_object = self.bus.get_object(
self.wpas_dbus_interface,
self.wpas_dbus_opath)
self.wpas = dbus.Interface(self.wpas_object,
self.wpas_dbus_interface)
# Try to see if supplicant knows about interface
# If not, throw an exception
try:
self.path = self.wpas.GetInterface(
self.interface_name)
except dbus.DBusException as exc:
error = 'Error:\n Interface ' + self.interface_name \
+ ' was not found'
print(error)
usage()
os._exit(0)
self.interface_object = self.bus.get_object(
self.wpas_dbus_interface, self.path)
self.p2p_interface = dbus.Interface(self.interface_object,
self.wpas_dbus_interfaces_p2pdevice)
#Adds listeners
self.bus.add_signal_receiver(InvitationResult,
dbus_interface=self.wpas_dbus_interfaces_p2pdevice,
signal_name="InvitationResult")
# Sets up p2p_invite dictionary
def constructArguements(self):
self.P2PDictionary = \
{'peer':dbus.ObjectPath(self.path+'/Peers/'+self.addr)}
if (self.persistent_group_object != None):
self.P2PDictionary.update({"persistent_group_object":
self.persistent_group_object})
# Run p2p_invite
def run(self):
try:
self.p2p_interface.Invite(self.P2PDictionary)
except:
print("Error:\n Invalid Arguments")
usage()
os._exit(0)
# Allows other threads to keep working while MainLoop runs
# Required for timeout implementation
gobject.MainLoop().get_context().iteration(True)
gobject.threads_init()
gobject.MainLoop().run()
if __name__ == "__main__":
# Defaults for optional inputs
addr = None
persistent_group_object = None
wpas_dbus_interface = 'fi.w1.wpa_supplicant1'
# interface_name is required
interface_name = None
# Using getopts to handle options
try:
options, args = getopt.getopt(sys.argv[1:],"hi:o:w:a:")
except getopt.GetoptError:
usage()
quit()
# If there's a switch, override default option
for key, value in options:
# Help
if (key == "-h"):
usage()
quit()
# Interface Name
elif (key == "-i"):
interface_name = value
elif (key == "-a"):
addr = value
# Persistent group object path
elif (key == "-o"):
persistent_group_object = value
# Dbus interface
elif (key == "-w"):
wpas_dbus_interface = value
else:
assert False, "unhandled option"
# Interface name is required and was not given
if (interface_name == None):
print("Error:\n interface_name is required")
usage()
quit()
if (addr == None):
print("Error:\n peer address is required")
usage()
quit()
try:
p2p_invite_test = \
P2P_Invite(interface_name,wpas_dbus_interface,
addr,persistent_group_object)
except:
print("Error:\n Invalid Arguments")
usage()
os._exit(1)
p2p_invite_test.constructArguements()
p2p_invite_test.start()
time.sleep(10)
print("Error:\n p2p_invite timed out")
os._exit(0)