00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 import os
00035 import sys
00036 import unittest
00037 import cStringIO
00038 import time
00039
00040 import rosunit
00041
00042 from subprocess import Popen, PIPE, check_call, call
00043
00044 from contextlib import contextmanager
00045
00046 @contextmanager
00047 def fakestdout():
00048 realstdout = sys.stdout
00049 fakestdout = cStringIO.StringIO()
00050 sys.stdout = fakestdout
00051 yield fakestdout
00052 sys.stdout = realstdout
00053
00054 def todict(s):
00055 d = {}
00056 for l in s.split('\n'):
00057 key, p, val = l.partition(':')
00058 if p:
00059 d[key] = val.strip()
00060 return d
00061
00062 class TestRosservice(unittest.TestCase):
00063
00064 def setUp(self):
00065 pass
00066
00067 def test_get_service_headers(self):
00068 import rosservice
00069 orig_uri = os.environ['ROS_MASTER_URI']
00070 os.environ['ROS_MASTER_URI'] = 'http://fake_host:12356'
00071 try:
00072 c = 'rosservice'
00073
00074
00075 try:
00076 rosservice.get_service_headers('/add_two_ints', 'fake://localhost:1234')
00077 self.fail("should have raised")
00078 except rosservice.ROSServiceException: pass
00079 try:
00080 rosservice.get_service_headers('/add_two_ints', 'rosrpc://fake_host:1234')
00081 self.fail("should have raised IO exc")
00082 except rosservice.ROSServiceIOException: pass
00083
00084
00085 finally:
00086 os.environ['ROS_MASTER_URI'] = orig_uri
00087
00088 def test_get_service_type(self):
00089 import rosservice
00090 self.assertEquals('test_rosmaster/AddTwoInts', rosservice.get_service_type('/add_two_ints'))
00091 self.assertEquals(None, rosservice.get_service_type('/fake_add_two_ints'))
00092
00093 def test_offline(self):
00094 import rosservice
00095 orig_uri = os.environ['ROS_MASTER_URI']
00096 os.environ['ROS_MASTER_URI'] = 'http://fake_host:12356'
00097
00098 try:
00099 c = 'rosservice'
00100
00101 try:
00102 rosservice.get_service_type('/add_two_ints')
00103 self.fail("should have raised ROSServiceIOException")
00104 except rosservice.ROSServiceIOException:
00105 pass
00106
00107 try:
00108 rosservice._rosservice_cmd_list([c, 'list'])
00109 self.fail("should have raised ROSServiceIOException")
00110 except rosservice.ROSServiceIOException: pass
00111
00112 try:
00113 rosservice._rosservice_cmd_info([c, 'info', '/add_two_ints'])
00114 self.fail("should have raised ROSServiceIOException")
00115 except rosservice.ROSServiceIOException: pass
00116
00117 try:
00118 rosservice._rosservice_cmd_type([c, 'type', '/add_two_ints'])
00119 self.fail("should have raised ROSServiceIOException")
00120 except rosservice.ROSServiceIOException: pass
00121
00122 try:
00123 rosservice._rosservice_cmd_uri([c, 'uri', '/add_two_ints'])
00124 self.fail("should have raised ROSServiceIOException")
00125 except rosservice.ROSServiceIOException: pass
00126
00127 try:
00128 rosservice._rosservice_cmd_find([c, 'find', 'test_ros/AddTwoInts'])
00129 self.fail("should have raised ROSServiceIOException")
00130 except rosservice.ROSServiceIOException: pass
00131
00132 try:
00133 rosservice._rosservice_cmd_call([c, 'call', '/add_two_ints', '1', '2'])
00134 self.fail("should have raised ROSServiceIOException")
00135 except rosservice.ROSServiceIOException: pass
00136
00137 finally:
00138 os.environ['ROS_MASTER_URI'] = orig_uri
00139
00140 def test_cmd_type(self):
00141 import rosservice
00142 cmd = 'rosservice'
00143 s = '/add_two_ints'
00144 try:
00145 rosservice.rosservicemain([cmd, 'type', '/fake_service'])
00146 self.fail("should have triggered error exit")
00147 except SystemExit:
00148 pass
00149
00150 for s in ['/add_two_ints', 'add_two_ints', 'foo/add_two_ints']:
00151 with fakestdout() as b:
00152 rosservice.rosservicemain([cmd, 'type', s])
00153 v = b.getvalue().strip()
00154 self.assertEquals('test_rosmaster/AddTwoInts', v)
00155
00156 def test_cmd_uri(self):
00157 import rosservice
00158 cmd = 'rosservice'
00159 with fakestdout() as b:
00160 try:
00161 rosservice.rosservicemain([cmd, 'uri', '/fake_service'])
00162 self.fail("should have triggered error exit")
00163 except SystemExit:
00164 pass
00165
00166 for s in ['/add_two_ints', 'add_two_ints', 'foo/add_two_ints']:
00167 with fakestdout() as b:
00168 rosservice.rosservicemain([cmd, 'uri', s])
00169 v = b.getvalue().strip()
00170 self.assert_(v.startswith('rosrpc://'), v)
00171
00172
00173 def test_cmd_node(self):
00174 import rosservice
00175 cmd = 'rosservice'
00176 for s in ['/add_two_ints', 'add_two_ints', 'foo/add_two_ints']:
00177 with fakestdout() as b:
00178 rosservice.rosservicemain([cmd, 'node', s])
00179 v = b.getvalue().strip()
00180 if 'foo' in s:
00181 self.assertEquals('/foo/a2iserver', v)
00182 else:
00183 self.assertEquals('/a2iserver', v)
00184 try:
00185 rosservice.rosservicemain([cmd, 'node', '/fake_two_ints'])
00186 self.fail("should have exited with error")
00187 except SystemExit: pass
00188
00189 def test_full_usage(self):
00190 import rosservice
00191 try:
00192 rosservice._fullusage()
00193 self.fail("should have caused system exit")
00194 except SystemExit: pass
00195
00196 def test_cmd_info(self):
00197 import rosservice
00198 cmd = 'rosservice'
00199
00200 try:
00201 rosservice.rosservicemain([cmd, 'info'])
00202 self.fail("should have exited with error")
00203 except SystemExit: pass
00204 try:
00205 rosservice.rosservicemain([cmd, 'info', '/fake_service'])
00206 self.fail("should have exited with error")
00207 except SystemExit: pass
00208 try:
00209 rosservice.rosservicemain([cmd, 'info', '/add_two_ints', '/foo/add_two_ints'])
00210 self.fail("should have exited with error")
00211 except SystemExit: pass
00212
00213 for s in ['/add_two_ints', 'add_two_ints', 'foo/add_two_ints']:
00214 with fakestdout() as b:
00215 rosservice.rosservicemain([cmd, 'info', s])
00216 d = todict(b.getvalue())
00217 if 'foo' in s:
00218 self.assertEquals('/foo/a2iserver', d['Node'])
00219 else:
00220 self.assertEquals('/a2iserver', d['Node'], repr(d['Node']))
00221 self.assertEquals('test_rosmaster/AddTwoInts', d['Type'])
00222 self.assertEquals('a b', d['Args'])
00223 self.assert_('URI' in d)
00224
00225 def test_cmd_find(self):
00226 import rosservice
00227 cmd = 'rosservice'
00228
00229 try:
00230 rosservice.rosservicemain([cmd, 'find'])
00231 self.fail("arg parsing should have failed")
00232 except SystemExit: pass
00233 try:
00234 rosservice.rosservicemain([cmd, 'find', 'test_ros/AddTwoInts', 'test/AddThreeInts'])
00235 self.fail("arg parsing should have failed")
00236 except SystemExit: pass
00237
00238 v = set(['/add_two_ints', '/bar/add_two_ints', '/foo/add_two_ints'])
00239 with fakestdout() as b:
00240 rosservice.rosservicemain([cmd, 'find', 'test_rosmaster/AddTwoInts'])
00241 d = set([x for x in b.getvalue().split('\n') if x.strip()])
00242 self.assertEquals(v, d)
00243
00244 with fakestdout() as b:
00245 rosservice.rosservicemain([cmd, 'find', 'fake/AddTwoInts'])
00246 self.assertEquals('', b.getvalue().strip())
00247
00248 def test_get_service_class_by_name(self):
00249 import rosservice
00250 try:
00251 rosservice.get_service_class_by_name('fake')
00252 self.fail("should have raised")
00253 except rosservice.ROSServiceException, e:
00254 self.assertEquals("Service [fake] is not available.", str(e))
00255
00256 def test_cmd_call(self):
00257 import rosservice
00258 cmd = 'rosservice'
00259
00260 try:
00261 rosservice.rosservicemain([cmd, 'call'])
00262 self.fail("arg parsing should have failed")
00263 except SystemExit: pass
00264 try:
00265 rosservice.rosservicemain([cmd, 'call', 'add_two_ints', '1', '2', '3'])
00266 self.fail("should have failed with too many args")
00267 except SystemExit: pass
00268
00269
00270 def setUp(self):
00271
00272
00273 import rosservice
00274 services = ['/add_two_ints',
00275 '/foo/add_two_ints',
00276 '/bar/add_two_ints',
00277 ]
00278
00279 import time
00280 timeout_t = time.time() + 10.
00281 while time.time() < timeout_t:
00282 with fakestdout() as b:
00283 rosservice._rosservice_cmd_list(['rosservice', 'list'])
00284 v = [x.strip() for x in b.getvalue().split('\n') if x.strip()]
00285 if not (set(services) - set(v) ):
00286 return
00287 self.fail("timeout")
00288
00289 def test_cmd_list(self):
00290 import rosservice
00291 cmd = 'rosservice'
00292 s = '/add_two_ints'
00293
00294
00295 rosservice.rosservicemain([cmd, 'list'])
00296
00297
00298 services = ['/add_two_ints',
00299 '/foo/add_two_ints',
00300 '/bar/add_two_ints',
00301 '/header_echo',
00302 ]
00303 services_nodes = ['/add_two_ints /a2iserver',
00304 '/foo/add_two_ints /foo/a2iserver',
00305 '/bar/add_two_ints /bar/a2iserver',
00306 '/header_echo /headerserver',
00307 ]
00308
00309 with fakestdout() as b:
00310 rosservice._rosservice_cmd_list([cmd, 'list'])
00311 v = [x.strip() for x in b.getvalue().split('\n') if x.strip()]
00312 v = [x for x in v if not x.startswith('/rosout/')]
00313 v = [x for x in v if not x.endswith('/get_loggers') and not x.endswith('/set_logger_level')]
00314 self.assertEquals(set(services), set(v))
00315 with fakestdout() as b:
00316 rosservice._rosservice_cmd_list([cmd, 'list', '-n'])
00317 v = [x.strip() for x in b.getvalue().split('\n') if x.strip()]
00318 v = [x for x in v if not x.startswith('/rosout/')]
00319 v = [x for x in v if x.find('/get_loggers ') == -1 and x.find('/set_logger_level ') == -1]
00320 self.assertEquals(set(services_nodes), set(v))
00321 with fakestdout() as b:
00322 rosservice._rosservice_cmd_list([cmd, 'list', '--nodes'])
00323 v = [x.strip() for x in b.getvalue().split('\n') if x.strip()]
00324 v = [x for x in v if not x.startswith('/rosout/')]
00325 v = [x for x in v if x.find('/get_loggers ') == -1 and x.find('/set_logger_level ') == -1]
00326 self.assertEquals(set(services_nodes), set(v))
00327
00328
00329 try:
00330 rosservice._rosservice_cmd_list([cmd, 'list', s, s])
00331 self.fail("should have caused parser error")
00332 except SystemExit:
00333 pass
00334
00335
00336 for s in services:
00337 with fakestdout() as b:
00338 rosservice._rosservice_cmd_list([cmd, 'list', s])
00339 self.assertEquals(s, b.getvalue().strip())
00340
00341
00342 s = 'add_two_ints'
00343 with fakestdout() as b:
00344 rosservice._rosservice_cmd_list([cmd, 'list', s])
00345 self.assertEquals('/add_two_ints', b.getvalue().strip())
00346 with fakestdout() as b:
00347 rosservice._rosservice_cmd_list([cmd, 'list', s, '-n'])
00348 self.assertEquals('/add_two_ints /a2iserver', b.getvalue().strip())
00349 with fakestdout() as b:
00350 rosservice._rosservice_cmd_list([cmd, 'list', s, '--nodes'])
00351 self.assertEquals('/add_two_ints /a2iserver', b.getvalue().strip())
00352
00353
00354 s = '/foo'
00355 rosservice._rosservice_cmd_list([cmd, 'list', s])
00356 rosservice._rosservice_cmd_list([cmd, 'list', s, '-n'])
00357 rosservice._rosservice_cmd_list([cmd, 'list', s, '--nodes'])
00358 s = 'foo'
00359 rosservice._rosservice_cmd_list([cmd, 'list', s])
00360 rosservice._rosservice_cmd_list([cmd, 'list', s, '-n'])
00361 rosservice._rosservice_cmd_list([cmd, 'list', s, '--nodes'])
00362
00363
00364 NAME = 'test_rosservice'
00365 if __name__ == '__main__':
00366 rosunit.unitrun('test_rosservice', NAME, TestRosservice, sys.argv, coverage_packages=['rosservice'])