00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032 import roslib; roslib.load_manifest('test_roslib')
00033
00034 import os
00035 import struct
00036 import sys
00037 import unittest
00038
00039 import roslib.network
00040 import rosunit
00041
00042 class MockSock(object):
00043 def __init__(self, data=''):
00044 self.data = data
00045 def recv(self):
00046 d = self.data
00047 self.data = ''
00048 return d
00049 def sendall(self, d):
00050 self.data = self.data + d
00051 def send(self, d):
00052 self.data = self.data + d
00053 return len(d)
00054
00055
00056 class NetworkTest(unittest.TestCase):
00057
00058 def test_encode_ros_handshake_header(self):
00059 from roslib.network import encode_ros_handshake_header
00060 d = {}
00061 self.assertEquals(struct.pack('<I', 0), encode_ros_handshake_header(d))
00062 s = "a=b"
00063 d['a'] = 'b'
00064 encoded = struct.pack('<I', len(s))+s
00065 self.assertEquals(struct.pack('<I', len(encoded))+encoded,
00066 encode_ros_handshake_header({'a': 'b'}))
00067 d['c'] = 'd'
00068 s = "c=d"
00069 encoded = encoded+struct.pack('<I', len(s))+s
00070 self.assertEquals(struct.pack('<I', len(encoded))+encoded,
00071 encode_ros_handshake_header(d))
00072 d['rawtype'] = '#line 1\nline 2\nline 3\nline\t4\r\r\n'
00073 s = "rawtype=#line 1\nline 2\nline 3\nline\t4\r\r\n"
00074 encoded = encoded+struct.pack('<I', len(s))+s
00075 self.assertEquals(struct.pack('<I', len(encoded))+encoded,
00076 encode_ros_handshake_header(d))
00077
00078 def test_decode_ros_handshake_header(self):
00079 from roslib.network import decode_ros_handshake_header, ROSHandshakeException
00080
00081 invalids = ["field1","",]
00082
00083 invalids = [(struct.pack('<I', len(s)) + s) for s in invalids]
00084
00085 invalids = [(struct.pack('<I', len(s)) + s) for s in invalids]
00086
00087
00088 valid = "a=b"
00089 valid = struct.pack('<I', len(valid)) + valid
00090 invalids.append(struct.pack('<I', 123)+valid)
00091
00092 invalid = struct.pack('<I', 123)+'a=b'
00093 invalids.append(struct.pack("<I", len(invalid)) + invalid)
00094
00095 for i in invalids:
00096 try:
00097 decode_ros_handshake_header(i)
00098 self.fail("should have failed: %s"%i)
00099 except ROSHandshakeException: pass
00100
00101 self.assertEquals({}, decode_ros_handshake_header(struct.pack('<I', 0)))
00102
00103 tests = [
00104 ("a=b", {'a': 'b'}),
00105
00106 (" a =b", {'a': 'b'}),
00107 ('newlines=\n\n\n\n', {'newlines': '\n\n\n\n'}),
00108 ('equals=foo=bar=car', {'equals': 'foo=bar=car'}),
00109 ("spaces=one two three four",{'spaces': 'one two three four'}),
00110 ]
00111 for s, d in tests:
00112
00113 s = struct.pack('<I', len(s)+4) + struct.pack('<I', len(s)) + s
00114 self.assertEquals(d, decode_ros_handshake_header(s))
00115
00116
00117 tests = [ {'a': 'b', 'c': 'd'},
00118 {'spaces': ' ', 'tabs': '\t\t\t\t', 'equals': '====='},
00119 ]
00120 for t in tests:
00121 s = ''
00122 for k, v in t.iteritems():
00123 f = "%s=%s"%(k, v)
00124 s += struct.pack('<I', len(f)) + f
00125 s = struct.pack('<I', len(s)) + s
00126 self.assertEquals(t, decode_ros_handshake_header(s))
00127
00128 self.assertEquals(t, decode_ros_handshake_header(s+s))
00129
00130 def test_parse_http_host_and_port(self):
00131 from roslib.network import parse_http_host_and_port
00132 invalid = ['', 'http://', 'http://localhost:bar', None]
00133 for t in invalid:
00134 try:
00135 parse_http_host_and_port(t)
00136 self.fail("should have failed: %s"%t)
00137 except ValueError:
00138 pass
00139
00140 self.assertEquals(('localhost', 80), parse_http_host_and_port('http://localhost'))
00141 self.assertEquals(('localhost', 1234), parse_http_host_and_port('http://localhost:1234'))
00142 self.assertEquals(('localhost', 1), parse_http_host_and_port('http://localhost:1'))
00143 self.assertEquals(('willowgarage.com', 1), parse_http_host_and_port('http://willowgarage.com:1'))
00144
00145 def test_get_local_address(self):
00146
00147 from roslib.network import get_local_address
00148 a = get_local_address()
00149 self.assert_(type(a) == str)
00150 self.assert_(a)
00151
00152
00153 os.environ['ROS_IP'] = 'bar'
00154 self.assertEquals('bar', get_local_address())
00155 os.environ['ROS_HOSTNAME'] = 'foo'
00156 self.assertEquals('foo', get_local_address())
00157
00158 try:
00159 real_argv = sys.argv[:]
00160 sys.argv = real_argv[:] + ['__ip:=1.2.3.4']
00161 self.assertEquals('1.2.3.4', get_local_address())
00162 sys.argv = real_argv[:] + ['__hostname:=bar']
00163 self.assertEquals('bar', get_local_address())
00164 finally:
00165 sys.argv = real_argv
00166
00167 def test_get_local_addresses(self):
00168
00169 from roslib.network import get_local_addresses
00170 addrs = get_local_addresses()
00171 self.assert_(type(addrs) == list)
00172 self.assert_(len(addrs))
00173 for a in addrs:
00174 self.assert_(type(a) == str)
00175
00176
00177 os.environ['ROS_IP'] = 'bar'
00178 self.assertEquals(addrs, get_local_addresses())
00179 os.environ['ROS_HOSTNAME'] = 'foo'
00180 self.assertEquals(addrs, get_local_addresses())
00181
00182 def test_get_bind_address(self):
00183 from roslib.network import get_bind_address
00184 self.assertEquals('0.0.0.0', get_bind_address('foo'))
00185 self.assertEquals('127.0.0.1', get_bind_address('localhost'))
00186 self.assertEquals('127.0.0.1', get_bind_address('127.0.1.1'))
00187
00188
00189 os.environ['ROS_IP'] = 'bar'
00190 self.assertEquals('0.0.0.0', get_bind_address())
00191 self.assertEquals('0.0.0.0', get_bind_address('foo'))
00192 os.environ['ROS_IP'] = 'localhost'
00193 self.assertEquals('127.0.0.1', get_bind_address())
00194 self.assertEquals('0.0.0.0', get_bind_address('foo'))
00195 os.environ['ROS_HOSTNAME'] = 'bar'
00196 self.assertEquals('0.0.0.0', get_bind_address())
00197 self.assertEquals('0.0.0.0', get_bind_address('foo'))
00198 os.environ['ROS_HOSTNAME'] = 'localhost'
00199 self.assertEquals('127.0.0.1', get_bind_address())
00200 self.assertEquals('0.0.0.0', get_bind_address('foo'))
00201
00202 def test_get_host_name(self):
00203 from roslib.network import get_host_name
00204 self.assertEquals(type(get_host_name()), str)
00205
00206 os.environ['ROS_IP'] = 'foo'
00207 self.assertEquals('foo', get_host_name())
00208 os.environ['ROS_HOSTNAME'] = 'bar'
00209 self.assertEquals('bar', get_host_name())
00210
00211 try:
00212 real_argv = sys.argv[:]
00213 sys.argv = real_argv[:] + ['__ip:=1.2.3.4']
00214 self.assertEquals('1.2.3.4', get_host_name())
00215 sys.argv = real_argv[:] + ['__hostname:=baz']
00216 self.assertEquals('baz', get_host_name())
00217 finally:
00218 sys.argv = real_argv
00219
00220 def test_create_local_xmlrpc_uri(self):
00221 from roslib.network import parse_http_host_and_port, create_local_xmlrpc_uri
00222 self.assertEquals(type(create_local_xmlrpc_uri(1234)), str)
00223 os.environ['ROS_HOSTNAME'] = 'localhost'
00224 self.assertEquals(('localhost', 1234), parse_http_host_and_port(create_local_xmlrpc_uri(1234)))
00225
00226 def setUp(self):
00227 self._ros_hostname = self._ros_ip = None
00228 if 'ROS_HOSTNAME' in os.environ:
00229 self._ros_hostname = os.environ['ROS_HOSTNAME']
00230 del os.environ['ROS_HOSTNAME']
00231 if 'ROS_IP' in os.environ:
00232 self._ros_ip = os.environ['ROS_IP']
00233 del os.environ['ROS_IP']
00234
00235 def tearDown(self):
00236 if 'ROS_HOSTNAME' in os.environ:
00237 del os.environ['ROS_HOSTNAME']
00238 if 'ROS_IP' in os.environ:
00239 del os.environ['ROS_IP']
00240 if self._ros_hostname:
00241 os.environ['ROS_HOSTNAME'] = self._ros_hostname
00242 if self._ros_ip:
00243 os.environ['ROS_IP'] = self._ros_ip
00244
00245 if __name__ == '__main__':
00246 rosunit.unitrun('test_roslib', 'test_network', NetworkTest, coverage_packages=['roslib.network'])
00247