38 from cStringIO
import StringIO
40 from io
import StringIO
45 from subprocess
import Popen, PIPE, check_call, call
47 from contextlib
import contextmanager
51 realstdout = sys.stdout
52 fakestdout = StringIO()
53 sys.stdout = fakestdout
55 sys.stdout = realstdout
59 for l
in s.split(
'\n'):
60 key, p, val = l.partition(
':')
72 orig_uri = os.environ[
'ROS_MASTER_URI']
73 os.environ[
'ROS_MASTER_URI'] =
'http://fake_host:12356' 79 rosservice.get_service_headers(
'/add_two_ints',
'fake://localhost:1234')
80 self.fail(
"should have raised")
81 except rosservice.ROSServiceException:
pass 83 rosservice.get_service_headers(
'/add_two_ints',
'rosrpc://fake_host:1234')
84 self.fail(
"should have raised IO exc")
85 except rosservice.ROSServiceIOException:
pass 89 os.environ[
'ROS_MASTER_URI'] = orig_uri
93 self.assertEquals(
'test_rosmaster/AddTwoInts', rosservice.get_service_type(
'/add_two_ints'))
94 self.assertEquals(
None, rosservice.get_service_type(
'/fake_add_two_ints'))
98 orig_uri = os.environ[
'ROS_MASTER_URI']
99 os.environ[
'ROS_MASTER_URI'] =
'http://fake_host:12356' 105 rosservice.get_service_type(
'/add_two_ints')
106 self.fail(
"should have raised ROSServiceIOException")
107 except rosservice.ROSServiceIOException:
111 rosservice._rosservice_cmd_list([c,
'list'])
112 self.fail(
"should have raised ROSServiceIOException")
113 except rosservice.ROSServiceIOException:
pass 116 rosservice._rosservice_cmd_info([c,
'info',
'/add_two_ints'])
117 self.fail(
"should have raised ROSServiceIOException")
118 except rosservice.ROSServiceIOException:
pass 121 rosservice._rosservice_cmd_type([c,
'type',
'/add_two_ints'])
122 self.fail(
"should have raised ROSServiceIOException")
123 except rosservice.ROSServiceIOException:
pass 126 rosservice._rosservice_cmd_uri([c,
'uri',
'/add_two_ints'])
127 self.fail(
"should have raised ROSServiceIOException")
128 except rosservice.ROSServiceIOException:
pass 131 rosservice._rosservice_cmd_find([c,
'find',
'test_ros/AddTwoInts'])
132 self.fail(
"should have raised ROSServiceIOException")
133 except rosservice.ROSServiceIOException:
pass 136 rosservice._rosservice_cmd_call([c,
'call',
'/add_two_ints',
'1',
'2'])
137 self.fail(
"should have raised ROSServiceIOException")
138 except rosservice.ROSServiceIOException:
pass 141 os.environ[
'ROS_MASTER_URI'] = orig_uri
148 rosservice.rosservicemain([cmd,
'type',
'/fake_service'])
149 self.fail(
"should have triggered error exit")
153 for s
in [
'/add_two_ints',
'add_two_ints',
'foo/add_two_ints']:
155 rosservice.rosservicemain([cmd,
'type', s])
156 v = b.getvalue().strip()
157 self.assertEquals(
'test_rosmaster/AddTwoInts', v)
164 rosservice.rosservicemain([cmd,
'uri',
'/fake_service'])
165 self.fail(
"should have triggered error exit")
169 for s
in [
'/add_two_ints',
'add_two_ints',
'foo/add_two_ints']:
171 rosservice.rosservicemain([cmd,
'uri', s])
172 v = b.getvalue().strip()
173 self.assert_(v.startswith(
'rosrpc://'), v)
179 for s
in [
'/add_two_ints',
'add_two_ints',
'foo/add_two_ints']:
181 rosservice.rosservicemain([cmd,
'node', s])
182 v = b.getvalue().strip()
184 self.assertEquals(
'/foo/a2iserver', v)
186 self.assertEquals(
'/a2iserver', v)
188 rosservice.rosservicemain([cmd,
'node',
'/fake_two_ints'])
189 self.fail(
"should have exited with error")
190 except SystemExit:
pass 195 rosservice._fullusage()
196 self.fail(
"should have caused system exit")
197 except SystemExit:
pass 204 rosservice.rosservicemain([cmd,
'info'])
205 self.fail(
"should have exited with error")
206 except SystemExit:
pass 208 rosservice.rosservicemain([cmd,
'info',
'/fake_service'])
209 self.fail(
"should have exited with error")
210 except SystemExit:
pass 212 rosservice.rosservicemain([cmd,
'info',
'/add_two_ints',
'/foo/add_two_ints'])
213 self.fail(
"should have exited with error")
214 except SystemExit:
pass 216 for s
in [
'/add_two_ints',
'add_two_ints',
'foo/add_two_ints']:
218 rosservice.rosservicemain([cmd,
'info', s])
221 self.assertEquals(
'/foo/a2iserver', d[
'Node'])
223 self.assertEquals(
'/a2iserver', d[
'Node'], repr(d[
'Node']))
224 self.assertEquals(
'test_rosmaster/AddTwoInts', d[
'Type'])
225 self.assertEquals(
'a b', d[
'Args'])
226 self.assert_(
'URI' in d)
233 rosservice.rosservicemain([cmd,
'find'])
234 self.fail(
"arg parsing should have failed")
235 except SystemExit:
pass 237 rosservice.rosservicemain([cmd,
'find',
'test_ros/AddTwoInts',
'test/AddThreeInts'])
238 self.fail(
"arg parsing should have failed")
239 except SystemExit:
pass 241 v = set([
'/add_two_ints',
'/bar/add_two_ints',
'/foo/add_two_ints'])
243 rosservice.rosservicemain([cmd,
'find',
'test_rosmaster/AddTwoInts'])
244 d = set([x
for x
in b.getvalue().split(
'\n')
if x.strip()])
245 self.assertEquals(v, d)
248 rosservice.rosservicemain([cmd,
'find',
'fake/AddTwoInts'])
249 self.assertEquals(
'', b.getvalue().strip())
254 rosservice.get_service_class_by_name(
'fake')
255 self.fail(
"should have raised")
256 except rosservice.ROSServiceException
as e:
257 self.assertEquals(
"Service [fake] is not available.", str(e))
264 rosservice.rosservicemain([cmd,
'call'])
265 self.fail(
"arg parsing should have failed")
266 except SystemExit:
pass 268 rosservice.rosservicemain([cmd,
'call',
'add_two_ints',
'1',
'2',
'3'])
269 self.fail(
"should have failed with too many args")
270 except SystemExit:
pass 277 services = [
'/add_two_ints',
283 timeout_t = time.time() + 10.
284 while time.time() < timeout_t:
286 rosservice._rosservice_cmd_list([
'rosservice',
'list'])
287 v = [x.strip()
for x
in b.getvalue().split(
'\n')
if x.strip()]
288 if not (set(services) - set(v) ):
298 rosservice.rosservicemain([cmd,
'list'])
301 services = [
'/add_two_ints',
306 services_nodes = [
'/add_two_ints /a2iserver',
307 '/foo/add_two_ints /foo/a2iserver',
308 '/bar/add_two_ints /bar/a2iserver',
309 '/header_echo /headerserver',
313 rosservice._rosservice_cmd_list([cmd,
'list'])
314 v = [x.strip()
for x
in b.getvalue().split(
'\n')
if x.strip()]
315 v = [x
for x
in v
if not x.startswith(
'/rosout/')]
316 v = [x
for x
in v
if not x.endswith(
'/get_loggers')
and not x.endswith(
'/set_logger_level')]
317 self.assertEquals(set(services), set(v))
319 rosservice._rosservice_cmd_list([cmd,
'list',
'-n'])
320 v = [x.strip()
for x
in b.getvalue().split(
'\n')
if x.strip()]
321 v = [x
for x
in v
if not x.startswith(
'/rosout/')]
322 v = [x
for x
in v
if x.find(
'/get_loggers ') == -1
and x.find(
'/set_logger_level ') == -1]
323 self.assertEquals(set(services_nodes), set(v))
325 rosservice._rosservice_cmd_list([cmd,
'list',
'--nodes'])
326 v = [x.strip()
for x
in b.getvalue().split(
'\n')
if x.strip()]
327 v = [x
for x
in v
if not x.startswith(
'/rosout/')]
328 v = [x
for x
in v
if x.find(
'/get_loggers ') == -1
and x.find(
'/set_logger_level ') == -1]
329 self.assertEquals(set(services_nodes), set(v))
333 rosservice._rosservice_cmd_list([cmd,
'list', s, s])
334 self.fail(
"should have caused parser error")
341 rosservice._rosservice_cmd_list([cmd,
'list', s])
342 self.assertEquals(s, b.getvalue().strip())
347 rosservice._rosservice_cmd_list([cmd,
'list', s])
348 self.assertEquals(
'/add_two_ints', b.getvalue().strip())
350 rosservice._rosservice_cmd_list([cmd,
'list', s,
'-n'])
351 self.assertEquals(
'/add_two_ints /a2iserver', b.getvalue().strip())
353 rosservice._rosservice_cmd_list([cmd,
'list', s,
'--nodes'])
354 self.assertEquals(
'/add_two_ints /a2iserver', b.getvalue().strip())
358 rosservice._rosservice_cmd_list([cmd,
'list', s])
359 rosservice._rosservice_cmd_list([cmd,
'list', s,
'-n'])
360 rosservice._rosservice_cmd_list([cmd,
'list', s,
'--nodes'])
362 rosservice._rosservice_cmd_list([cmd,
'list', s])
363 rosservice._rosservice_cmd_list([cmd,
'list', s,
'-n'])
364 rosservice._rosservice_cmd_list([cmd,
'list', s,
'--nodes'])
367 NAME =
'test_rosservice' 368 if __name__ ==
'__main__':
369 rosunit.unitrun(
'test_rosservice', NAME, TestRosservice, sys.argv, coverage_packages=[
'rosservice'])
def test_full_usage(self)
def test_get_service_headers(self)
def test_get_service_type(self)
def test_get_service_class_by_name(self)