$search
00001 #! /usr/bin/env python 00002 00003 import sys 00004 import time 00005 import subprocess 00006 import unittest 00007 import re 00008 00009 import roslib; roslib.load_manifest('network_control_tests') 00010 import rospy 00011 import rostest 00012 00013 import dynamic_reconfigure.client 00014 from network_monitor_udp.linktest import UdpmonsourceHandle 00015 from network_monitor_udp.linktest import LinkTest 00016 from network_monitor_udp.msg import LinktestGoal 00017 from ieee80211_channels.channels import IEEE80211_Channels 00018 00019 AP_REAL_IP = "192.168.68.1" 00020 AP_FAKE_IP = "192.168.69.1" 00021 STA_REAL_IP = "192.168.69.2" 00022 STA_FAKE_IP = "192.168.68.2" 00023 00024 class HostapdTest(unittest.TestCase): 00025 def __init__(self, *args): 00026 super(HostapdTest, self).__init__(*args) 00027 rospy.init_node('hostapd_access_point_test') 00028 self.ap1_iface = rospy.get_param('~ap1_iface') 00029 self.ap2_iface = rospy.get_param('~ap2_iface') 00030 self.sta_iface = rospy.get_param('~sta_iface') 00031 00032 self.dyn_ap1 = dynamic_reconfigure.client.Client("hostapd1") 00033 self.dyn_ap2 = dynamic_reconfigure.client.Client("hostapd2") 00034 self.reset_params = { 00035 "enabled" : False, 00036 "ssid": "test", 00037 "wmm": False, 00038 "mode": 'b', 00039 "freq": 2412e6, 00040 "ieee80211n": False, 00041 "encryption_mode": "open", 00042 "encryption_pass": "", 00043 "txpower_auto": "True", 00044 "txpower": 0, 00045 "bitrate": 0 } 00046 self.dyn_ap1.update_configuration(self.reset_params) 00047 self.dyn_ap2.update_configuration(self.reset_params) 00048 00049 self.hwsim_nat_setup_path = \ 00050 roslib.packages.find_node('hostapd_access_point', 'hwsim_nat_setup.sh') 00051 00052 self.srcnode = UdpmonsourceHandle('performance_test') 00053 self.srcnode.cancel_all_tests() 00054 00055 def setUp(self): 00056 pass 00057 00058 def tearDown(self): 00059 self.srcnode.cancel_all_tests() 00060 self.dyn_ap1.update_configuration(self.reset_params) 00061 self.dyn_ap2.update_configuration(self.reset_params) 00062 subprocess.call(["wpa_cli", "-i", self.sta_iface, "terminate"], 00063 stdout = subprocess.PIPE, stderr = subprocess.STDOUT) 00064 00065 def setup_nat_rules(self): 00066 p = subprocess.Popen([self.hwsim_nat_setup_path, 00067 self.ap1_iface, AP_REAL_IP, AP_FAKE_IP, 00068 self.sta_iface, STA_REAL_IP, STA_FAKE_IP], 00069 stdout = subprocess.PIPE, stderr = subprocess.STDOUT) 00070 (out, err) = p.communicate() 00071 self.assertEqual(p.returncode, 0, 00072 "Setting hwsim NAT rules on %s and %s failed: %s"% 00073 (self.ap1_iface, self.sta_iface, out)) 00074 00075 def start_wpa_supplicant(self, conffile): 00076 supp_conf_path = roslib.packages.find_resource("network_control_tests", conffile) 00077 supp_conf_path = supp_conf_path[0] 00078 ret = subprocess.call(["wpa_supplicant", "-Dnl80211", "-i" + self.sta_iface, "-B", "-c" + supp_conf_path], 00079 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 00080 self.assertEqual(ret, 0, 00081 "Starting wpa_supplicant with WEP conf on %s failed with ret code %d"% 00082 (self.sta_iface, ret)) 00083 00084 def test_freq(self): 00085 freq = IEEE80211_Channels.get_freq(5, IEEE80211_Channels.BAND_2400_MHz) 00086 config = self.dyn_ap1.update_configuration({"enabled": True, "ssid": "testnet1", 00087 "mode": 'b', "freq": freq}) 00088 self.assertEqual(config['status'], "OK", 00089 "Operation failed: " + config['errmsg']) 00090 ap1_info = IwconfigInfo(self.ap1_iface) 00091 self.assertEqual(ap1_info.freq(), freq, 00092 "Expected freq to be %ld, but instead it was %ld"%(freq, ap1_info.freq())) 00093 00094 freq = IEEE80211_Channels.get_freq(44, IEEE80211_Channels.BAND_5000_MHz) 00095 config = self.dyn_ap1.update_configuration({"freq": freq}) 00096 ap1_info = IwconfigInfo(self.ap1_iface) 00097 self.assertEqual(config['status'], "FAIL", 00098 "Expected setting an 802.11a freq when in 802.11b mode to fail: %s"% 00099 (ap1_info.out)) 00100 00101 config = self.dyn_ap1.update_configuration({"freq": freq, "mode": "a", "enabled": True}) 00102 ap1_info = IwconfigInfo(self.ap1_iface) 00103 self.assertEqual(ap1_info.freq(), freq, 00104 "Expected freq to be %ld, but instead it was %ld"%(freq, ap1_info.freq())) 00105 00106 def test_txpower(self): 00107 # set txpower to 0dBm 00108 config = self.dyn_ap1.update_configuration({"enabled": True, "ssid": "testnet1", 00109 "mode": 'b', "txpower_auto": False, "txpower": 0}) 00110 self.assertEqual(config['status'], "OK", 00111 "Operation failed: " + config['errmsg']) 00112 self.assertEqual(config['txpower'], 0, 00113 "Expected configuration txpower to be 0 instead it was %d"%(config['txpower'])) 00114 ap1_info = IwconfigInfo(self.ap1_iface) 00115 self.assertEqual(ap1_info.txpower(), 0, 00116 "Expected iwconfig txpower to be 0 instead it was %d"%(ap1_info.txpower())) 00117 00118 # set txpower to 10dBm 00119 config = self.dyn_ap1.update_configuration({"txpower": 10}) 00120 self.assertEqual(config['status'], "OK", 00121 "Operation failed: " + config['errmsg']) 00122 self.assertEqual(config['txpower'], 10, 00123 "Expected configuration txpower to be 10 instead it was %d"%(config['txpower'])) 00124 ap1_info = IwconfigInfo(self.ap1_iface) 00125 self.assertEqual(ap1_info.txpower(), 10, 00126 "Expected iwconfig txpower to be 10 instead it was %d"%(ap1_info.txpower())) 00127 00128 # set txpower to -10dBm, this is not legal so expecting to move to the nearest legal value, which is 0 00129 config = self.dyn_ap1.update_configuration({"txpower": -10}) 00130 self.assertEqual(config['status'], "OK", 00131 "Operation failed: " + config['errmsg']) 00132 self.assertEqual(config['txpower'], 0, 00133 "Expected configuration txpower to be 0 instead it was %d"%(config['txpower'])) 00134 ap1_info = IwconfigInfo(self.ap1_iface) 00135 self.assertEqual(ap1_info.txpower(), 0, 00136 "Expected iwconfig txpower to be 0 instead it was %d"%(ap1_info.txpower())) 00137 00138 # set txpower to 30dBm, this is not legal so expecting to move to the nearest legal value, which is 20 00139 config = self.dyn_ap1.update_configuration({"txpower": 30}) 00140 self.assertEqual(config['status'], "OK", 00141 "Operation failed: " + config['errmsg']) 00142 self.assertEqual(config['txpower'], 20, 00143 "Expected configuration txpower to be 20 instead it was %d"%(config['txpower'])) 00144 ap1_info = IwconfigInfo(self.ap1_iface) 00145 self.assertEqual(ap1_info.txpower(), 20, 00146 "Expected iwconfig txpower to be 20 instead it was %d"%(ap1_info.txpower())) 00147 00148 00149 # just tests that legal values are accepted since there is no way to read the bitrate while in master mode 00150 # and mac80211_hwsim does not actually limit rate (does not emulate rates) 00151 def test_bitrate(self): 00152 config = self.dyn_ap1.update_configuration({"enabled": True, "ssid": "testnet1", 00153 "mode": 'g', "bitrate": 1*10**6}) 00154 self.assertEqual(config['status'], "OK", 00155 "Operation failed: " + config['errmsg']) 00156 self.assertEqual(config['bitrate'], 1*10**6, 00157 "Expected configuration bitrate to be %d instead it was %d"% 00158 (1*10**6, config['bitrate'])) 00159 00160 config = self.dyn_ap1.update_configuration({"bitrate": 54*10**6}) 00161 self.assertEqual(config['status'], "OK", 00162 "Operation failed: " + config['errmsg']) 00163 self.assertEqual(config['bitrate'], 54*10**6, 00164 "Expected configuration bitrate to be %d instead it was %d"% 00165 (54*10**6, config['bitrate'])) 00166 00167 config = self.dyn_ap1.update_configuration({"bitrate": 3*10**6}) 00168 self.assertEqual(config['status'], "FAIL", 00169 "Setting an illegal bitrate of 3Mbit/s should have failed") 00170 00171 00172 def test_txpower_auto(self): 00173 config = self.dyn_ap1.update_configuration({"enabled": True, "ssid": "testnet1", 00174 "txpower_auto": True, "txpower": 0}) 00175 self.assertEqual(config['status'], "OK", 00176 "Operation failed: " + config['errmsg']) 00177 self.assertTrue(config['txpower_auto'], 00178 "Expected txpower_auto to be True") 00179 self.assertEqual(config['txpower'], 20, 00180 "Expected txpower to be maximum (20) instead it was %d"%(config['txpower'])) 00181 00182 def test_start_stop(self): 00183 config = self.dyn_ap1.update_configuration({"enabled": True, "ssid": "testnet1"}) 00184 self.assertEqual(config['status'], "OK", 00185 "Operation failed: " + config['errmsg']) 00186 self.assertTrue(config['enabled'], 00187 "Expected hostapd to be running") 00188 for i in range(3): 00189 config = self.dyn_ap1.update_configuration({"enabled": False}) 00190 self.assertEqual(config['status'], "OK", 00191 "Operation failed: " + config['errmsg']) 00192 self.assertFalse(config['enabled'], 00193 "Expected hostapd to be stopped") 00194 config = self.dyn_ap1.update_configuration({"enabled": True}) 00195 self.assertEqual(config['status'], "OK", 00196 "Operation failed: " + config['errmsg']) 00197 self.assertTrue(config['enabled'], 00198 "Expected hostapd to be running") 00199 00200 00201 def test_multiple_aps(self): 00202 config = self.dyn_ap1.update_configuration({"enabled": True, "ssid": "testnet1"}) 00203 self.assertEqual(config['status'], "OK", 00204 "Operation failed: " + config['errmsg']) 00205 self.assertTrue(config['enabled'], 00206 "Expected hostapd to be running") 00207 config = self.dyn_ap2.update_configuration({"enabled": True, "ssid": "testnet2"}) 00208 self.assertEqual(config['status'], "OK", 00209 "Operation failed: " + config['errmsg']) 00210 self.assertTrue(config['enabled'], 00211 "Expected hostapd to be running") 00212 ret = subprocess.call(["ifconfig", self.sta_iface, "up"], 00213 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 00214 self.assertEqual(ret, 0, 00215 "ifup on interface %s failed with ret code %d"% 00216 (self.sta_iface, ret)) 00217 p = subprocess.Popen(["iwlist", self.sta_iface, "scan"], stdout = subprocess.PIPE, stderr = subprocess.PIPE) 00218 (out, err) = p.communicate() 00219 self.assertTrue(re.findall('testnet1', out), 00220 "Expected testnet1 ssid to be found in station scan: " + out) 00221 self.assertTrue(re.findall('testnet2', out), 00222 "Expected testnet2 ssid to be found in station scan: " + out) 00223 00224 def test_ieee80211n(self): 00225 config = self.dyn_ap1.update_configuration({"ieee80211n": True, "enabled": True}) 00226 self.assertEqual(config['status'], "OK", 00227 "Operation failed: " + config['errmsg']) 00228 00229 # needs a newer hostapd version than available in Ubuntu 10.04 00230 # it has been tested with hostapd 0.7.3 which has wmm support 00231 def disabled_test_wmm(self): 00232 config = self.dyn_ap1.update_configuration({"wmm": True, "enabled": True}) 00233 self.assertEqual(config['status'], "OK", 00234 "Operation failed: " + config['errmsg']) 00235 00236 def test_encryption(self): 00237 #bring up AP in WPA mode (with wrong key) 00238 config = self.dyn_ap1.update_configuration({"ssid": "testnet1", 00239 "enabled": True, "encryption_mode": "wpa", 00240 "encryption_pass": "violetsareblue"}) 00241 self.assertEqual(config['status'], "OK", 00242 "Operation failed: " + config['errmsg']) 00243 00244 # setup NAT rules 00245 self.setup_nat_rules() 00246 00247 # starting wpa_supplicant on sta_interface in WEP mode 00248 self.start_wpa_supplicant("wpa_supplicant_wpa.conf") 00249 # give it some time to connect 00250 time.sleep(5.0) 00251 00252 test = self.srcnode.create_test(bw = 2.0 * 10**6, pktsize = 1500, duration = 5.0, 00253 sink_ip = AP_FAKE_IP, sink_port = 12345, 00254 update_interval = 0.2) 00255 test.start() 00256 time.sleep(5.5) 00257 self.assertTrue(test.linkdown(), 00258 "Link should be down since WPA pass is wrong") 00259 00260 # set the correct key 00261 config = self.dyn_ap1.update_configuration({"encryption_pass": "rosesarered"}) 00262 self.assertEqual(config['status'], "OK", 00263 "Operation failed: " + config['errmsg']) 00264 00265 # setup NAT rules again to make sure ARP entries are in place 00266 self.setup_nat_rules() 00267 00268 # give it some time to reconnect 00269 time.sleep(5.0) 00270 00271 test = self.srcnode.create_test(bw = 2.0 * 10**6, pktsize = 1500, duration = 5.0, 00272 sink_ip = AP_FAKE_IP, sink_port = 12345, 00273 update_interval = 0.2) 00274 test.start() 00275 time.sleep(5.5) 00276 self.assertFalse(test.linkdown(), 00277 "Link should be up since WPA pass is now correct") 00278 00279 # stop wpa_supplicant 00280 subprocess.call(["wpa_cli", "-i", self.sta_iface, "terminate"], 00281 stdout = subprocess.PIPE, stderr = subprocess.STDOUT) 00282 00283 # starting wpa_supplicant on sta_interface in WPA2 mode 00284 self.start_wpa_supplicant("wpa_supplicant_wpa2.conf") 00285 00286 # give it some time to reconnect 00287 time.sleep(5.0) 00288 00289 test = self.srcnode.create_test(bw = 2.0 * 10**6, pktsize = 1500, duration = 5.0, 00290 sink_ip = AP_FAKE_IP, sink_port = 12345, 00291 update_interval = 0.2) 00292 test.start() 00293 time.sleep(5.5) 00294 self.assertTrue(test.linkdown(), 00295 "Link should be down since client wants WPA2, but AP is in WPA mode") 00296 00297 # set the correct key 00298 config = self.dyn_ap1.update_configuration({"encryption_mode": "wpa_wpa2"}) 00299 self.assertEqual(config['status'], "OK", 00300 "Operation failed: " + config['errmsg']) 00301 00302 # setup NAT rules again to make sure ARP entries are in place 00303 self.setup_nat_rules() 00304 00305 # give it some time to reconnect 00306 time.sleep(5.0) 00307 00308 test = self.srcnode.create_test(bw = 2.0 * 10**6, pktsize = 1500, duration = 5.0, 00309 sink_ip = AP_FAKE_IP, sink_port = 12345, 00310 update_interval = 0.2) 00311 test.start() 00312 time.sleep(5.5) 00313 self.assertFalse(test.linkdown(), 00314 "Link should be up because AP is in WPA-WPA2 mode, and STA in WPA2") 00315 00316 00317 class IwconfigInfo: 00318 def __init__(self, interface): 00319 p = subprocess.Popen(["iwconfig", interface], 00320 stdout = subprocess.PIPE, stderr = subprocess.PIPE) 00321 (self.out, err) = p.communicate() 00322 if p.returncode != 0: 00323 raise IOError(p.returncode, "iwconfig on %s failed: %s"%(interface, err)) 00324 00325 def is_master(self): 00326 return re.findall('Mode:\s*(.aster)', self.out) 00327 00328 def is_station(self): 00329 return re.findall('Mode:\s*(.anaged)', self.out) 00330 00331 def freq(self): 00332 freq = re.findall('Frequency:\s*(\S+)\s*GHz', self.out) 00333 if not freq: 00334 raise IOError(-1, "Could not find frequency in iwconfig output: %s"%(self.out)) 00335 return long(float(freq[0]) * 1e9) 00336 00337 def txpower(self): 00338 txpower = re.findall('Tx-Power=\s*(\S+)\s*dBm', self.out) 00339 if not txpower: 00340 raise IOError(-1, "Could not find tx power in iwconfig output: %s"%(self.out)) 00341 return int(txpower[0]) 00342 00343 def ap(self): 00344 if self.is_master(): 00345 raise TypeError("Interface is in master mode, can't be associated to an AP") 00346 00347 ap = re.findall('Access Point: \s*(\w\w:\w\w:\w\w:\w\w:\w\w:\w\w)', self.out) 00348 00349 if ap: 00350 return ap[0] 00351 else: 00352 return None 00353 00354 def associated(self): 00355 return self.ap() is not None 00356 00357 if __name__ == '__main__': 00358 try: 00359 rostest.run('network_control_tests', 'hostapd_test', HostapdTest) 00360 except KeyboardInterrupt, e: 00361 pass