00001
00002
00003 import sys
00004 import time
00005 import subprocess
00006 import unittest
00007 import re
00008
00009 import roslib; roslib.load_manifest('network_control_tests')
00010 import rospy
00011 import rostest
00012
00013 import dynamic_reconfigure.client
00014 from network_monitor_udp.linktest import UdpmonsourceHandle
00015 from network_monitor_udp.linktest import LinkTest
00016 from network_monitor_udp.msg import LinktestGoal
00017 from ieee80211_channels.channels import IEEE80211_Channels
00018
00019 AP_REAL_IP = "192.168.68.1"
00020 AP_FAKE_IP = "192.168.69.1"
00021 STA_REAL_IP = "192.168.69.2"
00022 STA_FAKE_IP = "192.168.68.2"
00023
00024 class HostapdTest(unittest.TestCase):
00025 def __init__(self, *args):
00026 super(HostapdTest, self).__init__(*args)
00027 rospy.init_node('hostapd_access_point_test')
00028 self.ap1_iface = rospy.get_param('~ap1_iface')
00029 self.ap2_iface = rospy.get_param('~ap2_iface')
00030 self.sta_iface = rospy.get_param('~sta_iface')
00031
00032 self.dyn_ap1 = dynamic_reconfigure.client.Client("hostapd1")
00033 self.dyn_ap2 = dynamic_reconfigure.client.Client("hostapd2")
00034 self.reset_params = {
00035 "enabled" : False,
00036 "ssid": "test",
00037 "wmm": False,
00038 "mode": 'b',
00039 "freq": 2412e6,
00040 "ieee80211n": False,
00041 "encryption_mode": "open",
00042 "encryption_pass": "",
00043 "txpower_auto": "True",
00044 "txpower": 0,
00045 "bitrate": 0 }
00046 self.dyn_ap1.update_configuration(self.reset_params)
00047 self.dyn_ap2.update_configuration(self.reset_params)
00048
00049 self.hwsim_nat_setup_path = \
00050 roslib.packages.find_node('hostapd_access_point', 'hwsim_nat_setup.sh')
00051
00052 self.srcnode = UdpmonsourceHandle('performance_test')
00053 self.srcnode.cancel_all_tests()
00054
00055 def setUp(self):
00056 pass
00057
00058 def tearDown(self):
00059 self.srcnode.cancel_all_tests()
00060 self.dyn_ap1.update_configuration(self.reset_params)
00061 self.dyn_ap2.update_configuration(self.reset_params)
00062 subprocess.call(["wpa_cli", "-i", self.sta_iface, "terminate"],
00063 stdout = subprocess.PIPE, stderr = subprocess.STDOUT)
00064
00065 def setup_nat_rules(self):
00066 p = subprocess.Popen([self.hwsim_nat_setup_path,
00067 self.ap1_iface, AP_REAL_IP, AP_FAKE_IP,
00068 self.sta_iface, STA_REAL_IP, STA_FAKE_IP],
00069 stdout = subprocess.PIPE, stderr = subprocess.STDOUT)
00070 (out, err) = p.communicate()
00071 self.assertEqual(p.returncode, 0,
00072 "Setting hwsim NAT rules on %s and %s failed: %s"%
00073 (self.ap1_iface, self.sta_iface, out))
00074
00075 def start_wpa_supplicant(self, conffile):
00076 supp_conf_path = roslib.packages.find_resource("network_control_tests", conffile)
00077 supp_conf_path = supp_conf_path[0]
00078 ret = subprocess.call(["wpa_supplicant", "-Dnl80211", "-i" + self.sta_iface, "-B", "-c" + supp_conf_path],
00079 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
00080 self.assertEqual(ret, 0,
00081 "Starting wpa_supplicant with WEP conf on %s failed with ret code %d"%
00082 (self.sta_iface, ret))
00083
00084 def test_freq(self):
00085 freq = IEEE80211_Channels.get_freq(5, IEEE80211_Channels.BAND_2400_MHz)
00086 config = self.dyn_ap1.update_configuration({"enabled": True, "ssid": "testnet1",
00087 "mode": 'b', "freq": freq})
00088 self.assertEqual(config['status'], "OK",
00089 "Operation failed: " + config['errmsg'])
00090 ap1_info = IwconfigInfo(self.ap1_iface)
00091 self.assertEqual(ap1_info.freq(), freq,
00092 "Expected freq to be %ld, but instead it was %ld"%(freq, ap1_info.freq()))
00093
00094 freq = IEEE80211_Channels.get_freq(44, IEEE80211_Channels.BAND_5000_MHz)
00095 config = self.dyn_ap1.update_configuration({"freq": freq})
00096 ap1_info = IwconfigInfo(self.ap1_iface)
00097 self.assertEqual(config['status'], "FAIL",
00098 "Expected setting an 802.11a freq when in 802.11b mode to fail: %s"%
00099 (ap1_info.out))
00100
00101 config = self.dyn_ap1.update_configuration({"freq": freq, "mode": "a", "enabled": True})
00102 ap1_info = IwconfigInfo(self.ap1_iface)
00103 self.assertEqual(ap1_info.freq(), freq,
00104 "Expected freq to be %ld, but instead it was %ld"%(freq, ap1_info.freq()))
00105
00106 def test_txpower(self):
00107
00108 config = self.dyn_ap1.update_configuration({"enabled": True, "ssid": "testnet1",
00109 "mode": 'b', "txpower_auto": False, "txpower": 0})
00110 self.assertEqual(config['status'], "OK",
00111 "Operation failed: " + config['errmsg'])
00112 self.assertEqual(config['txpower'], 0,
00113 "Expected configuration txpower to be 0 instead it was %d"%(config['txpower']))
00114 ap1_info = IwconfigInfo(self.ap1_iface)
00115 self.assertEqual(ap1_info.txpower(), 0,
00116 "Expected iwconfig txpower to be 0 instead it was %d"%(ap1_info.txpower()))
00117
00118
00119 config = self.dyn_ap1.update_configuration({"txpower": 10})
00120 self.assertEqual(config['status'], "OK",
00121 "Operation failed: " + config['errmsg'])
00122 self.assertEqual(config['txpower'], 10,
00123 "Expected configuration txpower to be 10 instead it was %d"%(config['txpower']))
00124 ap1_info = IwconfigInfo(self.ap1_iface)
00125 self.assertEqual(ap1_info.txpower(), 10,
00126 "Expected iwconfig txpower to be 10 instead it was %d"%(ap1_info.txpower()))
00127
00128
00129 config = self.dyn_ap1.update_configuration({"txpower": -10})
00130 self.assertEqual(config['status'], "OK",
00131 "Operation failed: " + config['errmsg'])
00132 self.assertEqual(config['txpower'], 0,
00133 "Expected configuration txpower to be 0 instead it was %d"%(config['txpower']))
00134 ap1_info = IwconfigInfo(self.ap1_iface)
00135 self.assertEqual(ap1_info.txpower(), 0,
00136 "Expected iwconfig txpower to be 0 instead it was %d"%(ap1_info.txpower()))
00137
00138
00139 config = self.dyn_ap1.update_configuration({"txpower": 30})
00140 self.assertEqual(config['status'], "OK",
00141 "Operation failed: " + config['errmsg'])
00142 self.assertEqual(config['txpower'], 20,
00143 "Expected configuration txpower to be 20 instead it was %d"%(config['txpower']))
00144 ap1_info = IwconfigInfo(self.ap1_iface)
00145 self.assertEqual(ap1_info.txpower(), 20,
00146 "Expected iwconfig txpower to be 20 instead it was %d"%(ap1_info.txpower()))
00147
00148
00149
00150
00151 def test_bitrate(self):
00152 config = self.dyn_ap1.update_configuration({"enabled": True, "ssid": "testnet1",
00153 "mode": 'g', "bitrate": 1*10**6})
00154 self.assertEqual(config['status'], "OK",
00155 "Operation failed: " + config['errmsg'])
00156 self.assertEqual(config['bitrate'], 1*10**6,
00157 "Expected configuration bitrate to be %d instead it was %d"%
00158 (1*10**6, config['bitrate']))
00159
00160 config = self.dyn_ap1.update_configuration({"bitrate": 54*10**6})
00161 self.assertEqual(config['status'], "OK",
00162 "Operation failed: " + config['errmsg'])
00163 self.assertEqual(config['bitrate'], 54*10**6,
00164 "Expected configuration bitrate to be %d instead it was %d"%
00165 (54*10**6, config['bitrate']))
00166
00167 config = self.dyn_ap1.update_configuration({"bitrate": 3*10**6})
00168 self.assertEqual(config['status'], "FAIL",
00169 "Setting an illegal bitrate of 3Mbit/s should have failed")
00170
00171
00172 def test_txpower_auto(self):
00173 config = self.dyn_ap1.update_configuration({"enabled": True, "ssid": "testnet1",
00174 "txpower_auto": True, "txpower": 0})
00175 self.assertEqual(config['status'], "OK",
00176 "Operation failed: " + config['errmsg'])
00177 self.assertTrue(config['txpower_auto'],
00178 "Expected txpower_auto to be True")
00179 self.assertEqual(config['txpower'], 20,
00180 "Expected txpower to be maximum (20) instead it was %d"%(config['txpower']))
00181
00182 def test_start_stop(self):
00183 config = self.dyn_ap1.update_configuration({"enabled": True, "ssid": "testnet1"})
00184 self.assertEqual(config['status'], "OK",
00185 "Operation failed: " + config['errmsg'])
00186 self.assertTrue(config['enabled'],
00187 "Expected hostapd to be running")
00188 for i in range(3):
00189 config = self.dyn_ap1.update_configuration({"enabled": False})
00190 self.assertEqual(config['status'], "OK",
00191 "Operation failed: " + config['errmsg'])
00192 self.assertFalse(config['enabled'],
00193 "Expected hostapd to be stopped")
00194 config = self.dyn_ap1.update_configuration({"enabled": True})
00195 self.assertEqual(config['status'], "OK",
00196 "Operation failed: " + config['errmsg'])
00197 self.assertTrue(config['enabled'],
00198 "Expected hostapd to be running")
00199
00200
00201 def test_multiple_aps(self):
00202 config = self.dyn_ap1.update_configuration({"enabled": True, "ssid": "testnet1"})
00203 self.assertEqual(config['status'], "OK",
00204 "Operation failed: " + config['errmsg'])
00205 self.assertTrue(config['enabled'],
00206 "Expected hostapd to be running")
00207 config = self.dyn_ap2.update_configuration({"enabled": True, "ssid": "testnet2"})
00208 self.assertEqual(config['status'], "OK",
00209 "Operation failed: " + config['errmsg'])
00210 self.assertTrue(config['enabled'],
00211 "Expected hostapd to be running")
00212 ret = subprocess.call(["ifconfig", self.sta_iface, "up"],
00213 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
00214 self.assertEqual(ret, 0,
00215 "ifup on interface %s failed with ret code %d"%
00216 (self.sta_iface, ret))
00217 p = subprocess.Popen(["iwlist", self.sta_iface, "scan"], stdout = subprocess.PIPE, stderr = subprocess.PIPE)
00218 (out, err) = p.communicate()
00219 self.assertTrue(re.findall('testnet1', out),
00220 "Expected testnet1 ssid to be found in station scan: " + out)
00221 self.assertTrue(re.findall('testnet2', out),
00222 "Expected testnet2 ssid to be found in station scan: " + out)
00223
00224 def test_ieee80211n(self):
00225 config = self.dyn_ap1.update_configuration({"ieee80211n": True, "enabled": True})
00226 self.assertEqual(config['status'], "OK",
00227 "Operation failed: " + config['errmsg'])
00228
00229
00230
00231 def disabled_test_wmm(self):
00232 config = self.dyn_ap1.update_configuration({"wmm": True, "enabled": True})
00233 self.assertEqual(config['status'], "OK",
00234 "Operation failed: " + config['errmsg'])
00235
00236 def test_encryption(self):
00237
00238 config = self.dyn_ap1.update_configuration({"ssid": "testnet1",
00239 "enabled": True, "encryption_mode": "wpa",
00240 "encryption_pass": "violetsareblue"})
00241 self.assertEqual(config['status'], "OK",
00242 "Operation failed: " + config['errmsg'])
00243
00244
00245 self.setup_nat_rules()
00246
00247
00248 self.start_wpa_supplicant("wpa_supplicant_wpa.conf")
00249
00250 time.sleep(5.0)
00251
00252 test = self.srcnode.create_test(bw = 2.0 * 10**6, pktsize = 1500, duration = 5.0,
00253 sink_ip = AP_FAKE_IP, sink_port = 12345,
00254 update_interval = 0.2)
00255 test.start()
00256 time.sleep(5.5)
00257 self.assertTrue(test.linkdown(),
00258 "Link should be down since WPA pass is wrong")
00259
00260
00261 config = self.dyn_ap1.update_configuration({"encryption_pass": "rosesarered"})
00262 self.assertEqual(config['status'], "OK",
00263 "Operation failed: " + config['errmsg'])
00264
00265
00266 self.setup_nat_rules()
00267
00268
00269 time.sleep(5.0)
00270
00271 test = self.srcnode.create_test(bw = 2.0 * 10**6, pktsize = 1500, duration = 5.0,
00272 sink_ip = AP_FAKE_IP, sink_port = 12345,
00273 update_interval = 0.2)
00274 test.start()
00275 time.sleep(5.5)
00276 self.assertFalse(test.linkdown(),
00277 "Link should be up since WPA pass is now correct")
00278
00279
00280 subprocess.call(["wpa_cli", "-i", self.sta_iface, "terminate"],
00281 stdout = subprocess.PIPE, stderr = subprocess.STDOUT)
00282
00283
00284 self.start_wpa_supplicant("wpa_supplicant_wpa2.conf")
00285
00286
00287 time.sleep(5.0)
00288
00289 test = self.srcnode.create_test(bw = 2.0 * 10**6, pktsize = 1500, duration = 5.0,
00290 sink_ip = AP_FAKE_IP, sink_port = 12345,
00291 update_interval = 0.2)
00292 test.start()
00293 time.sleep(5.5)
00294 self.assertTrue(test.linkdown(),
00295 "Link should be down since client wants WPA2, but AP is in WPA mode")
00296
00297
00298 config = self.dyn_ap1.update_configuration({"encryption_mode": "wpa_wpa2"})
00299 self.assertEqual(config['status'], "OK",
00300 "Operation failed: " + config['errmsg'])
00301
00302
00303 self.setup_nat_rules()
00304
00305
00306 time.sleep(5.0)
00307
00308 test = self.srcnode.create_test(bw = 2.0 * 10**6, pktsize = 1500, duration = 5.0,
00309 sink_ip = AP_FAKE_IP, sink_port = 12345,
00310 update_interval = 0.2)
00311 test.start()
00312 time.sleep(5.5)
00313 self.assertFalse(test.linkdown(),
00314 "Link should be up because AP is in WPA-WPA2 mode, and STA in WPA2")
00315
00316
00317 class IwconfigInfo:
00318 def __init__(self, interface):
00319 p = subprocess.Popen(["iwconfig", interface],
00320 stdout = subprocess.PIPE, stderr = subprocess.PIPE)
00321 (self.out, err) = p.communicate()
00322 if p.returncode != 0:
00323 raise IOError(p.returncode, "iwconfig on %s failed: %s"%(interface, err))
00324
00325 def is_master(self):
00326 return re.findall('Mode:\s*(.aster)', self.out)
00327
00328 def is_station(self):
00329 return re.findall('Mode:\s*(.anaged)', self.out)
00330
00331 def freq(self):
00332 freq = re.findall('Frequency:\s*(\S+)\s*GHz', self.out)
00333 if not freq:
00334 raise IOError(-1, "Could not find frequency in iwconfig output: %s"%(self.out))
00335 return long(float(freq[0]) * 1e9)
00336
00337 def txpower(self):
00338 txpower = re.findall('Tx-Power=\s*(\S+)\s*dBm', self.out)
00339 if not txpower:
00340 raise IOError(-1, "Could not find tx power in iwconfig output: %s"%(self.out))
00341 return int(txpower[0])
00342
00343 def ap(self):
00344 if self.is_master():
00345 raise TypeError("Interface is in master mode, can't be associated to an AP")
00346
00347 ap = re.findall('Access Point: \s*(\w\w:\w\w:\w\w:\w\w:\w\w:\w\w)', self.out)
00348
00349 if ap:
00350 return ap[0]
00351 else:
00352 return None
00353
00354 def associated(self):
00355 return self.ap() is not None
00356
00357 if __name__ == '__main__':
00358 try:
00359 rostest.run('network_control_tests', 'hostapd_test', HostapdTest)
00360 except KeyboardInterrupt, e:
00361 pass