@ -11,21 +11,80 @@ logger = logging.getLogger()
from wpasupplicant import WpaSupplicant
def config_data_test ( dev1 , dev2 , dev1group , dev2group , ifname1 , ifname2 ) :
cmd = " DATA_TEST_CONFIG 1 "
if ifname1 :
cmd = cmd + " ifname= " + ifname1
if dev1group :
res = dev1 . group_request ( cmd )
else :
res = dev1 . request ( cmd )
if " OK " not in res :
raise Exception ( " Failed to enable data test functionality " )
cmd = " DATA_TEST_CONFIG 1 "
if ifname2 :
cmd = cmd + " ifname= " + ifname2
if dev2group :
res = dev2 . group_request ( cmd )
else :
res = dev2 . request ( cmd )
if " OK " not in res :
raise Exception ( " Failed to enable data test functionality " )
def run_multicast_connectivity_test ( dev1 , dev2 , tos = None ,
dev1group = False , dev2group = False ,
ifname1 = None , ifname2 = None ,
config = True , timeout = 5 ,
send_len = None , multicast_to_unicast = False ,
broadcast_retry_c = 1 ) :
addr1 = dev1 . get_addr ( dev1group )
addr2 = dev2 . get_addr ( dev2group )
if config :
config_data_test ( dev1 , dev2 , dev1group , dev2group , ifname1 , ifname2 )
cmd = " DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {} " . format ( addr1 , tos )
if send_len is not None :
cmd + = " len= " + str ( send_len )
for i in range ( broadcast_retry_c ) :
try :
if dev1group :
dev1 . group_request ( cmd )
else :
dev1 . request ( cmd )
if dev2group :
ev = dev2 . wait_group_event ( [ " DATA-TEST-RX " ] ,
timeout = timeout )
else :
ev = dev2 . wait_event ( [ " DATA-TEST-RX " ] , timeout = timeout )
if ev is None :
raise Exception ( " dev1->dev2 broadcast data delivery failed " )
if multicast_to_unicast :
if " DATA-TEST-RX ff:ff:ff:ff:ff:ff {} " . format ( addr1 ) in ev :
raise Exception ( " Unexpected dev1->dev2 broadcast data result: multicast to unicast conversion missing " )
if " DATA-TEST-RX {} {} " . format ( addr2 , addr1 ) not in ev :
raise Exception ( " Unexpected dev1->dev2 broadcast data result (multicast to unicast enabled) " )
else :
if " DATA-TEST-RX ff:ff:ff:ff:ff:ff {} " . format ( addr1 ) not in ev :
raise Exception ( " Unexpected dev1->dev2 broadcast data result " )
if send_len is not None :
if " len= " + str ( send_len ) not in ev :
raise Exception ( " Unexpected dev1->dev2 broadcast data length " )
else :
if " len= " in ev :
raise Exception ( " Unexpected dev1->dev2 broadcast data length " )
break
except Exception as e :
if i == broadcast_retry_c - 1 :
raise
def run_connectivity_test ( dev1 , dev2 , tos , dev1group = False , dev2group = False ,
ifname1 = None , ifname2 = None , config = True , timeout = 5 ,
multicast_to_unicast = False , broadcast = True ,
send_len = None ) :
addr1 = dev1 . own_addr ( )
if not dev1group and isinstance ( dev1 , WpaSupplicant ) :
addr = dev1 . get_driver_status_field ( ' addr ' )
if addr :
addr1 = addr
addr2 = dev2 . own_addr ( )
if not dev2group and isinstance ( dev2 , WpaSupplicant ) :
addr = dev2 . get_driver_status_field ( ' addr ' )
if addr :
addr2 = addr
addr1 = dev1 . get_addr ( dev1group )
addr2 = dev2 . get_addr ( dev2group )
dev1 . dump_monitor ( )
dev2 . dump_monitor ( )
@ -37,25 +96,7 @@ def run_connectivity_test(dev1, dev2, tos, dev1group=False, dev2group=False,
try :
if config :
cmd = " DATA_TEST_CONFIG 1 "
if ifname1 :
cmd = cmd + " ifname= " + ifname1
if dev1group :
res = dev1 . group_request ( cmd )
else :
res = dev1 . request ( cmd )
if " OK " not in res :
raise Exception ( " Failed to enable data test functionality " )
cmd = " DATA_TEST_CONFIG 1 "
if ifname2 :
cmd = cmd + " ifname= " + ifname2
if dev2group :
res = dev2 . group_request ( cmd )
else :
res = dev2 . request ( cmd )
if " OK " not in res :
raise Exception ( " Failed to enable data test functionality " )
config_data_test ( dev1 , dev2 , dev1group , dev2group , ifname1 , ifname2 )
cmd = " DATA_TEST_TX {} {} {} " . format ( addr2 , addr1 , tos )
if send_len is not None :
@ -80,34 +121,10 @@ def run_connectivity_test(dev1, dev2, tos, dev1group=False, dev2group=False,
raise Exception ( " Unexpected dev1->dev2 unicast data length " )
if broadcast :
cmd = " DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {} " . format ( addr1 , tos )
if send_len is not None :
cmd + = " len= " + str ( send_len )
for i in range ( broadcast_retry_c ) :
try :
if dev1group :
dev1 . group_request ( cmd )
else :
dev1 . request ( cmd )
if dev2group :
ev = dev2 . wait_group_event ( [ " DATA-TEST-RX " ] ,
timeout = timeout )
else :
ev = dev2 . wait_event ( [ " DATA-TEST-RX " ] , timeout = timeout )
if ev is None :
raise Exception ( " dev1->dev2 broadcast data delivery failed " )
if " DATA-TEST-RX ff:ff:ff:ff:ff:ff {} " . format ( addr1 ) not in ev :
raise Exception ( " Unexpected dev1->dev2 broadcast data result " )
if send_len is not None :
if " len= " + str ( send_len ) not in ev :
raise Exception ( " Unexpected dev1->dev2 broadcast data length " )
else :
if " len= " in ev :
raise Exception ( " Unexpected dev1->dev2 broadcast data length " )
break
except Exception as e :
if i == broadcast_retry_c - 1 :
raise
run_multicast_connectivity_test ( dev1 , dev2 , tos ,
dev1group , dev2group ,
ifname1 , ifname2 , False , timeout ,
send_len , False , broadcast_retry_c )
cmd = " DATA_TEST_TX {} {} {} " . format ( addr1 , addr2 , tos )
if send_len is not None :
@ -132,40 +149,12 @@ def run_connectivity_test(dev1, dev2, tos, dev1group=False, dev2group=False,
raise Exception ( " Unexpected dev2->dev1 unicast data length " )
if broadcast :
cmd = " DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {} " . format ( addr2 , tos )
if send_len is not None :
cmd + = " len= " + str ( send_len )
for i in range ( broadcast_retry_c ) :
try :
if dev2group :
dev2 . group_request ( cmd )
else :
dev2 . request ( cmd )
if dev1group :
ev = dev1 . wait_group_event ( [ " DATA-TEST-RX " ] ,
timeout = timeout )
else :
ev = dev1 . wait_event ( [ " DATA-TEST-RX " ] , timeout = timeout )
if ev is None :
raise Exception ( " dev2->dev1 broadcast data delivery failed " )
if multicast_to_unicast :
if " DATA-TEST-RX ff:ff:ff:ff:ff:ff {} " . format ( addr2 ) in ev :
raise Exception ( " Unexpected dev2->dev1 broadcast data result: multicast to unicast conversion missing " )
if " DATA-TEST-RX {} {} " . format ( addr1 , addr2 ) not in ev :
raise Exception ( " Unexpected dev2->dev1 broadcast data result (multicast to unicast enabled) " )
else :
if " DATA-TEST-RX ff:ff:ff:ff:ff:ff {} " . format ( addr2 ) not in ev :
raise Exception ( " Unexpected dev2->dev1 broadcast data result " )
if send_len is not None :
if " len= " + str ( send_len ) not in ev :
raise Exception ( " Unexpected dev2->dev1 broadcast data length " )
else :
if " len= " in ev :
raise Exception ( " Unexpected dev2->dev1 broadcast data length " )
break
except Exception as e :
if i == broadcast_retry_c - 1 :
raise
run_multicast_connectivity_test ( dev2 , dev1 , tos ,
dev2group , dev1group ,
ifname2 , ifname1 , False , timeout ,
send_len , multicast_to_unicast ,
broadcast_retry_c )
finally :
if config :
if dev1group :