00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 import os
00035 import sys
00036 import unittest
00037 try:
00038 from cStringIO import StringIO
00039 except ImportError:
00040 from io import StringIO
00041 import time
00042
00043 import rosunit
00044
00045 from subprocess import Popen, PIPE, check_call, call
00046
00047 from contextlib import contextmanager
00048
00049 @contextmanager
00050 def fakestdout():
00051 realstdout = sys.stdout
00052 fakestdout = StringIO()
00053 sys.stdout = fakestdout
00054 yield fakestdout
00055 sys.stdout = realstdout
00056
00057 def todict(s):
00058 d = {}
00059 for l in s.split('\n'):
00060 key, p, val = l.partition(':')
00061 if p:
00062 d[key] = val.strip()
00063 return d
00064
00065 class TestRosservice(unittest.TestCase):
00066
00067 def setUp(self):
00068 pass
00069
00070 def test_get_service_headers(self):
00071 import rosservice
00072 orig_uri = os.environ['ROS_MASTER_URI']
00073 os.environ['ROS_MASTER_URI'] = 'http://fake_host:12356'
00074 try:
00075 c = 'rosservice'
00076
00077
00078 try:
00079 rosservice.get_service_headers('/add_two_ints', 'fake://localhost:1234')
00080 self.fail("should have raised")
00081 except rosservice.ROSServiceException: pass
00082 try:
00083 rosservice.get_service_headers('/add_two_ints', 'rosrpc://fake_host:1234')
00084 self.fail("should have raised IO exc")
00085 except rosservice.ROSServiceIOException: pass
00086
00087
00088 finally:
00089 os.environ['ROS_MASTER_URI'] = orig_uri
00090
00091 def test_get_service_type(self):
00092 import rosservice
00093 self.assertEquals('test_rosmaster/AddTwoInts', rosservice.get_service_type('/add_two_ints'))
00094 self.assertEquals(None, rosservice.get_service_type('/fake_add_two_ints'))
00095
00096 def test_offline(self):
00097 import rosservice
00098 orig_uri = os.environ['ROS_MASTER_URI']
00099 os.environ['ROS_MASTER_URI'] = 'http://fake_host:12356'
00100
00101 try:
00102 c = 'rosservice'
00103
00104 try:
00105 rosservice.get_service_type('/add_two_ints')
00106 self.fail("should have raised ROSServiceIOException")
00107 except rosservice.ROSServiceIOException:
00108 pass
00109
00110 try:
00111 rosservice._rosservice_cmd_list([c, 'list'])
00112 self.fail("should have raised ROSServiceIOException")
00113 except rosservice.ROSServiceIOException: pass
00114
00115 try:
00116 rosservice._rosservice_cmd_info([c, 'info', '/add_two_ints'])
00117 self.fail("should have raised ROSServiceIOException")
00118 except rosservice.ROSServiceIOException: pass
00119
00120 try:
00121 rosservice._rosservice_cmd_type([c, 'type', '/add_two_ints'])
00122 self.fail("should have raised ROSServiceIOException")
00123 except rosservice.ROSServiceIOException: pass
00124
00125 try:
00126 rosservice._rosservice_cmd_uri([c, 'uri', '/add_two_ints'])
00127 self.fail("should have raised ROSServiceIOException")
00128 except rosservice.ROSServiceIOException: pass
00129
00130 try:
00131 rosservice._rosservice_cmd_find([c, 'find', 'test_ros/AddTwoInts'])
00132 self.fail("should have raised ROSServiceIOException")
00133 except rosservice.ROSServiceIOException: pass
00134
00135 try:
00136 rosservice._rosservice_cmd_call([c, 'call', '/add_two_ints', '1', '2'])
00137 self.fail("should have raised ROSServiceIOException")
00138 except rosservice.ROSServiceIOException: pass
00139
00140 finally:
00141 os.environ['ROS_MASTER_URI'] = orig_uri
00142
00143 def test_cmd_type(self):
00144 import rosservice
00145 cmd = 'rosservice'
00146 s = '/add_two_ints'
00147 try:
00148 rosservice.rosservicemain([cmd, 'type', '/fake_service'])
00149 self.fail("should have triggered error exit")
00150 except SystemExit:
00151 pass
00152
00153 for s in ['/add_two_ints', 'add_two_ints', 'foo/add_two_ints']:
00154 with fakestdout() as b:
00155 rosservice.rosservicemain([cmd, 'type', s])
00156 v = b.getvalue().strip()
00157 self.assertEquals('test_rosmaster/AddTwoInts', v)
00158
00159 def test_cmd_uri(self):
00160 import rosservice
00161 cmd = 'rosservice'
00162 with fakestdout() as b:
00163 try:
00164 rosservice.rosservicemain([cmd, 'uri', '/fake_service'])
00165 self.fail("should have triggered error exit")
00166 except SystemExit:
00167 pass
00168
00169 for s in ['/add_two_ints', 'add_two_ints', 'foo/add_two_ints']:
00170 with fakestdout() as b:
00171 rosservice.rosservicemain([cmd, 'uri', s])
00172 v = b.getvalue().strip()
00173 self.assert_(v.startswith('rosrpc://'), v)
00174
00175
00176 def test_cmd_node(self):
00177 import rosservice
00178 cmd = 'rosservice'
00179 for s in ['/add_two_ints', 'add_two_ints', 'foo/add_two_ints']:
00180 with fakestdout() as b:
00181 rosservice.rosservicemain([cmd, 'node', s])
00182 v = b.getvalue().strip()
00183 if 'foo' in s:
00184 self.assertEquals('/foo/a2iserver', v)
00185 else:
00186 self.assertEquals('/a2iserver', v)
00187 try:
00188 rosservice.rosservicemain([cmd, 'node', '/fake_two_ints'])
00189 self.fail("should have exited with error")
00190 except SystemExit: pass
00191
00192 def test_full_usage(self):
00193 import rosservice
00194 try:
00195 rosservice._fullusage()
00196 self.fail("should have caused system exit")
00197 except SystemExit: pass
00198
00199 def test_cmd_info(self):
00200 import rosservice
00201 cmd = 'rosservice'
00202
00203 try:
00204 rosservice.rosservicemain([cmd, 'info'])
00205 self.fail("should have exited with error")
00206 except SystemExit: pass
00207 try:
00208 rosservice.rosservicemain([cmd, 'info', '/fake_service'])
00209 self.fail("should have exited with error")
00210 except SystemExit: pass
00211 try:
00212 rosservice.rosservicemain([cmd, 'info', '/add_two_ints', '/foo/add_two_ints'])
00213 self.fail("should have exited with error")
00214 except SystemExit: pass
00215
00216 for s in ['/add_two_ints', 'add_two_ints', 'foo/add_two_ints']:
00217 with fakestdout() as b:
00218 rosservice.rosservicemain([cmd, 'info', s])
00219 d = todict(b.getvalue())
00220 if 'foo' in s:
00221 self.assertEquals('/foo/a2iserver', d['Node'])
00222 else:
00223 self.assertEquals('/a2iserver', d['Node'], repr(d['Node']))
00224 self.assertEquals('test_rosmaster/AddTwoInts', d['Type'])
00225 self.assertEquals('a b', d['Args'])
00226 self.assert_('URI' in d)
00227
00228 def test_cmd_find(self):
00229 import rosservice
00230 cmd = 'rosservice'
00231
00232 try:
00233 rosservice.rosservicemain([cmd, 'find'])
00234 self.fail("arg parsing should have failed")
00235 except SystemExit: pass
00236 try:
00237 rosservice.rosservicemain([cmd, 'find', 'test_ros/AddTwoInts', 'test/AddThreeInts'])
00238 self.fail("arg parsing should have failed")
00239 except SystemExit: pass
00240
00241 v = set(['/add_two_ints', '/bar/add_two_ints', '/foo/add_two_ints'])
00242 with fakestdout() as b:
00243 rosservice.rosservicemain([cmd, 'find', 'test_rosmaster/AddTwoInts'])
00244 d = set([x for x in b.getvalue().split('\n') if x.strip()])
00245 self.assertEquals(v, d)
00246
00247 with fakestdout() as b:
00248 rosservice.rosservicemain([cmd, 'find', 'fake/AddTwoInts'])
00249 self.assertEquals('', b.getvalue().strip())
00250
00251 def test_get_service_class_by_name(self):
00252 import rosservice
00253 try:
00254 rosservice.get_service_class_by_name('fake')
00255 self.fail("should have raised")
00256 except rosservice.ROSServiceException as e:
00257 self.assertEquals("Service [fake] is not available.", str(e))
00258
00259 def test_cmd_call(self):
00260 import rosservice
00261 cmd = 'rosservice'
00262
00263 try:
00264 rosservice.rosservicemain([cmd, 'call'])
00265 self.fail("arg parsing should have failed")
00266 except SystemExit: pass
00267 try:
00268 rosservice.rosservicemain([cmd, 'call', 'add_two_ints', '1', '2', '3'])
00269 self.fail("should have failed with too many args")
00270 except SystemExit: pass
00271
00272
00273 def setUp(self):
00274
00275
00276 import rosservice
00277 services = ['/add_two_ints',
00278 '/foo/add_two_ints',
00279 '/bar/add_two_ints',
00280 ]
00281
00282 import time
00283 timeout_t = time.time() + 10.
00284 while time.time() < timeout_t:
00285 with fakestdout() as b:
00286 rosservice._rosservice_cmd_list(['rosservice', 'list'])
00287 v = [x.strip() for x in b.getvalue().split('\n') if x.strip()]
00288 if not (set(services) - set(v) ):
00289 return
00290 self.fail("timeout")
00291
00292 def test_cmd_list(self):
00293 import rosservice
00294 cmd = 'rosservice'
00295 s = '/add_two_ints'
00296
00297
00298 rosservice.rosservicemain([cmd, 'list'])
00299
00300
00301 services = ['/add_two_ints',
00302 '/foo/add_two_ints',
00303 '/bar/add_two_ints',
00304 '/header_echo',
00305 ]
00306 services_nodes = ['/add_two_ints /a2iserver',
00307 '/foo/add_two_ints /foo/a2iserver',
00308 '/bar/add_two_ints /bar/a2iserver',
00309 '/header_echo /headerserver',
00310 ]
00311
00312 with fakestdout() as b:
00313 rosservice._rosservice_cmd_list([cmd, 'list'])
00314 v = [x.strip() for x in b.getvalue().split('\n') if x.strip()]
00315 v = [x for x in v if not x.startswith('/rosout/')]
00316 v = [x for x in v if not x.endswith('/get_loggers') and not x.endswith('/set_logger_level')]
00317 self.assertEquals(set(services), set(v))
00318 with fakestdout() as b:
00319 rosservice._rosservice_cmd_list([cmd, 'list', '-n'])
00320 v = [x.strip() for x in b.getvalue().split('\n') if x.strip()]
00321 v = [x for x in v if not x.startswith('/rosout/')]
00322 v = [x for x in v if x.find('/get_loggers ') == -1 and x.find('/set_logger_level ') == -1]
00323 self.assertEquals(set(services_nodes), set(v))
00324 with fakestdout() as b:
00325 rosservice._rosservice_cmd_list([cmd, 'list', '--nodes'])
00326 v = [x.strip() for x in b.getvalue().split('\n') if x.strip()]
00327 v = [x for x in v if not x.startswith('/rosout/')]
00328 v = [x for x in v if x.find('/get_loggers ') == -1 and x.find('/set_logger_level ') == -1]
00329 self.assertEquals(set(services_nodes), set(v))
00330
00331
00332 try:
00333 rosservice._rosservice_cmd_list([cmd, 'list', s, s])
00334 self.fail("should have caused parser error")
00335 except SystemExit:
00336 pass
00337
00338
00339 for s in services:
00340 with fakestdout() as b:
00341 rosservice._rosservice_cmd_list([cmd, 'list', s])
00342 self.assertEquals(s, b.getvalue().strip())
00343
00344
00345 s = 'add_two_ints'
00346 with fakestdout() as b:
00347 rosservice._rosservice_cmd_list([cmd, 'list', s])
00348 self.assertEquals('/add_two_ints', b.getvalue().strip())
00349 with fakestdout() as b:
00350 rosservice._rosservice_cmd_list([cmd, 'list', s, '-n'])
00351 self.assertEquals('/add_two_ints /a2iserver', b.getvalue().strip())
00352 with fakestdout() as b:
00353 rosservice._rosservice_cmd_list([cmd, 'list', s, '--nodes'])
00354 self.assertEquals('/add_two_ints /a2iserver', b.getvalue().strip())
00355
00356
00357 s = '/foo'
00358 rosservice._rosservice_cmd_list([cmd, 'list', s])
00359 rosservice._rosservice_cmd_list([cmd, 'list', s, '-n'])
00360 rosservice._rosservice_cmd_list([cmd, 'list', s, '--nodes'])
00361 s = 'foo'
00362 rosservice._rosservice_cmd_list([cmd, 'list', s])
00363 rosservice._rosservice_cmd_list([cmd, 'list', s, '-n'])
00364 rosservice._rosservice_cmd_list([cmd, 'list', s, '--nodes'])
00365
00366
00367 NAME = 'test_rosservice'
00368 if __name__ == '__main__':
00369 rosunit.unitrun('test_rosservice', NAME, TestRosservice, sys.argv, coverage_packages=['rosservice'])