check_rosservice.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Software License Agreement (BSD License)
3 #
4 # Copyright (c) 2009, Willow Garage, Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 #
11 # * Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following
15 # disclaimer in the documentation and/or other materials provided
16 # with the distribution.
17 # * Neither the name of Willow Garage, Inc. nor the names of its
18 # contributors may be used to endorse or promote products derived
19 # from this software without specific prior written permission.
20 #
21 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 # POSSIBILITY OF SUCH DAMAGE.
33 
34 import os
35 import sys
36 import unittest
37 try:
38  from cStringIO import StringIO
39 except ImportError:
40  from io import StringIO
41 import time
42 
43 import rosunit
44 
45 from subprocess import Popen, PIPE, check_call, call
46 
47 from contextlib import contextmanager
48 
49 @contextmanager
50 def fakestdout():
51  realstdout = sys.stdout
52  fakestdout = StringIO()
53  sys.stdout = fakestdout
54  yield fakestdout
55  sys.stdout = realstdout
56 
57 def todict(s):
58  d = {}
59  for l in s.split('\n'):
60  key, p, val = l.partition(':')
61  if p:
62  d[key] = val.strip()
63  return d
64 
65 class TestRosservice(unittest.TestCase):
66 
67  def setUp(self):
68  pass
69 
71  import rosservice
72  orig_uri = os.environ['ROS_MASTER_URI']
73  os.environ['ROS_MASTER_URI'] = 'http://fake_host:12356'
74  try:
75  c = 'rosservice'
76 
77  # test error conditions, integration tests cover success cases
78  try:
79  rosservice.get_service_headers('/add_two_ints', 'fake://localhost:1234')
80  self.fail("should have raised")
81  except rosservice.ROSServiceException: pass
82  try:
83  rosservice.get_service_headers('/add_two_ints', 'rosrpc://fake_host:1234')
84  self.fail("should have raised IO exc")
85  except rosservice.ROSServiceIOException: pass
86 
87 
88  finally:
89  os.environ['ROS_MASTER_URI'] = orig_uri
90 
92  import rosservice
93  self.assertEquals('test_rosmaster/AddTwoInts', rosservice.get_service_type('/add_two_ints'))
94  self.assertEquals(None, rosservice.get_service_type('/fake_add_two_ints'))
95 
96  def test_offline(self):
97  import rosservice
98  orig_uri = os.environ['ROS_MASTER_URI']
99  os.environ['ROS_MASTER_URI'] = 'http://fake_host:12356'
100 
101  try:
102  c = 'rosservice'
103 
104  try:
105  rosservice.get_service_type('/add_two_ints')
106  self.fail("should have raised ROSServiceIOException")
107  except rosservice.ROSServiceIOException:
108  pass
109 
110  try:
111  rosservice._rosservice_cmd_list([c, 'list'])
112  self.fail("should have raised ROSServiceIOException")
113  except rosservice.ROSServiceIOException: pass
114 
115  try:
116  rosservice._rosservice_cmd_info([c, 'info', '/add_two_ints'])
117  self.fail("should have raised ROSServiceIOException")
118  except rosservice.ROSServiceIOException: pass
119 
120  try:
121  rosservice._rosservice_cmd_type([c, 'type', '/add_two_ints'])
122  self.fail("should have raised ROSServiceIOException")
123  except rosservice.ROSServiceIOException: pass
124 
125  try:
126  rosservice._rosservice_cmd_uri([c, 'uri', '/add_two_ints'])
127  self.fail("should have raised ROSServiceIOException")
128  except rosservice.ROSServiceIOException: pass
129 
130  try:
131  rosservice._rosservice_cmd_find([c, 'find', 'test_ros/AddTwoInts'])
132  self.fail("should have raised ROSServiceIOException")
133  except rosservice.ROSServiceIOException: pass
134 
135  try:
136  rosservice._rosservice_cmd_call([c, 'call', '/add_two_ints', '1', '2'])
137  self.fail("should have raised ROSServiceIOException")
138  except rosservice.ROSServiceIOException: pass
139 
140  finally:
141  os.environ['ROS_MASTER_URI'] = orig_uri
142 
143  def test_cmd_type(self):
144  import rosservice
145  cmd = 'rosservice'
146  s = '/add_two_ints'
147  try:
148  rosservice.rosservicemain([cmd, 'type', '/fake_service'])
149  self.fail("should have triggered error exit")
150  except SystemExit:
151  pass
152 
153  for s in ['/add_two_ints', 'add_two_ints', 'foo/add_two_ints']:
154  with fakestdout() as b:
155  rosservice.rosservicemain([cmd, 'type', s])
156  v = b.getvalue().strip()
157  self.assertEquals('test_rosmaster/AddTwoInts', v)
158 
159  def test_cmd_uri(self):
160  import rosservice
161  cmd = 'rosservice'
162  with fakestdout() as b:
163  try:
164  rosservice.rosservicemain([cmd, 'uri', '/fake_service'])
165  self.fail("should have triggered error exit")
166  except SystemExit:
167  pass
168 
169  for s in ['/add_two_ints', 'add_two_ints', 'foo/add_two_ints']:
170  with fakestdout() as b:
171  rosservice.rosservicemain([cmd, 'uri', s])
172  v = b.getvalue().strip()
173  self.assert_(v.startswith('rosrpc://'), v)
174 
175 
176  def test_cmd_node(self):
177  import rosservice
178  cmd = 'rosservice'
179  for s in ['/add_two_ints', 'add_two_ints', 'foo/add_two_ints']:
180  with fakestdout() as b:
181  rosservice.rosservicemain([cmd, 'node', s])
182  v = b.getvalue().strip()
183  if 'foo' in s:
184  self.assertEquals('/foo/a2iserver', v)
185  else:
186  self.assertEquals('/a2iserver', v)
187  try:
188  rosservice.rosservicemain([cmd, 'node', '/fake_two_ints'])
189  self.fail("should have exited with error")
190  except SystemExit: pass
191 
192  def test_full_usage(self):
193  import rosservice
194  try:
195  rosservice._fullusage()
196  self.fail("should have caused system exit")
197  except SystemExit: pass
198 
199  def test_cmd_info(self):
200  import rosservice
201  cmd = 'rosservice'
202 
203  try:
204  rosservice.rosservicemain([cmd, 'info'])
205  self.fail("should have exited with error")
206  except SystemExit: pass
207  try:
208  rosservice.rosservicemain([cmd, 'info', '/fake_service'])
209  self.fail("should have exited with error")
210  except SystemExit: pass
211  try:
212  rosservice.rosservicemain([cmd, 'info', '/add_two_ints', '/foo/add_two_ints'])
213  self.fail("should have exited with error")
214  except SystemExit: pass
215 
216  for s in ['/add_two_ints', 'add_two_ints', 'foo/add_two_ints']:
217  with fakestdout() as b:
218  rosservice.rosservicemain([cmd, 'info', s])
219  d = todict(b.getvalue())
220  if 'foo' in s:
221  self.assertEquals('/foo/a2iserver', d['Node'])
222  else:
223  self.assertEquals('/a2iserver', d['Node'], repr(d['Node']))
224  self.assertEquals('test_rosmaster/AddTwoInts', d['Type'])
225  self.assertEquals('a b', d['Args'])
226  self.assert_('URI' in d)
227 
228  def test_cmd_find(self):
229  import rosservice
230  cmd = 'rosservice'
231 
232  try:
233  rosservice.rosservicemain([cmd, 'find'])
234  self.fail("arg parsing should have failed")
235  except SystemExit: pass
236  try:
237  rosservice.rosservicemain([cmd, 'find', 'test_ros/AddTwoInts', 'test/AddThreeInts'])
238  self.fail("arg parsing should have failed")
239  except SystemExit: pass
240 
241  v = set(['/add_two_ints', '/bar/add_two_ints', '/foo/add_two_ints'])
242  with fakestdout() as b:
243  rosservice.rosservicemain([cmd, 'find', 'test_rosmaster/AddTwoInts'])
244  d = set([x for x in b.getvalue().split('\n') if x.strip()])
245  self.assertEquals(v, d)
246 
247  with fakestdout() as b:
248  rosservice.rosservicemain([cmd, 'find', 'fake/AddTwoInts'])
249  self.assertEquals('', b.getvalue().strip())
250 
252  import rosservice
253  try:
254  rosservice.get_service_class_by_name('fake')
255  self.fail("should have raised")
256  except rosservice.ROSServiceException as e:
257  self.assertEquals("Service [fake] is not available.", str(e))
258 
259  def test_cmd_call(self):
260  import rosservice
261  cmd = 'rosservice'
262 
263  try:
264  rosservice.rosservicemain([cmd, 'call'])
265  self.fail("arg parsing should have failed")
266  except SystemExit: pass
267  try:
268  rosservice.rosservicemain([cmd, 'call', 'add_two_ints', '1', '2', '3'])
269  self.fail("should have failed with too many args")
270  except SystemExit: pass
271 
272 
273  def setUp(self):
274  # wait for all services to come up
275 
276  import rosservice
277  services = ['/add_two_ints',
278  '/foo/add_two_ints',
279  '/bar/add_two_ints',
280  ]
281 
282  import time
283  timeout_t = time.time() + 10.
284  while time.time() < timeout_t:
285  with fakestdout() as b:
286  rosservice._rosservice_cmd_list(['rosservice', 'list'])
287  v = [x.strip() for x in b.getvalue().split('\n') if x.strip()]
288  if not (set(services) - set(v) ):
289  return
290  self.fail("timeout")
291 
292  def test_cmd_list(self):
293  import rosservice
294  cmd = 'rosservice'
295  s = '/add_two_ints'
296 
297  # test main entry
298  rosservice.rosservicemain([cmd, 'list'])
299 
300  # test directly
301  services = ['/add_two_ints',
302  '/foo/add_two_ints',
303  '/bar/add_two_ints',
304  '/header_echo',
305  ]
306  services_nodes = ['/add_two_ints /a2iserver',
307  '/foo/add_two_ints /foo/a2iserver',
308  '/bar/add_two_ints /bar/a2iserver',
309  '/header_echo /headerserver',
310  ]
311 
312  with fakestdout() as b:
313  rosservice._rosservice_cmd_list([cmd, 'list'])
314  v = [x.strip() for x in b.getvalue().split('\n') if x.strip()]
315  v = [x for x in v if not x.startswith('/rosout/')]
316  v = [x for x in v if not x.endswith('/get_loggers') and not x.endswith('/set_logger_level')]
317  self.assertEquals(set(services), set(v))
318  with fakestdout() as b:
319  rosservice._rosservice_cmd_list([cmd, 'list', '-n'])
320  v = [x.strip() for x in b.getvalue().split('\n') if x.strip()]
321  v = [x for x in v if not x.startswith('/rosout/')]
322  v = [x for x in v if x.find('/get_loggers ') == -1 and x.find('/set_logger_level ') == -1]
323  self.assertEquals(set(services_nodes), set(v))
324  with fakestdout() as b:
325  rosservice._rosservice_cmd_list([cmd, 'list', '--nodes'])
326  v = [x.strip() for x in b.getvalue().split('\n') if x.strip()]
327  v = [x for x in v if not x.startswith('/rosout/')]
328  v = [x for x in v if x.find('/get_loggers ') == -1 and x.find('/set_logger_level ') == -1]
329  self.assertEquals(set(services_nodes), set(v))
330 
331  # test with multiple service names
332  try:
333  rosservice._rosservice_cmd_list([cmd, 'list', s, s])
334  self.fail("should have caused parser error")
335  except SystemExit:
336  pass
337 
338  # test with resolved service names
339  for s in services:
340  with fakestdout() as b:
341  rosservice._rosservice_cmd_list([cmd, 'list', s])
342  self.assertEquals(s, b.getvalue().strip())
343 
344  # test with relative service names
345  s = 'add_two_ints'
346  with fakestdout() as b:
347  rosservice._rosservice_cmd_list([cmd, 'list', s])
348  self.assertEquals('/add_two_ints', b.getvalue().strip())
349  with fakestdout() as b:
350  rosservice._rosservice_cmd_list([cmd, 'list', s, '-n'])
351  self.assertEquals('/add_two_ints /a2iserver', b.getvalue().strip())
352  with fakestdout() as b:
353  rosservice._rosservice_cmd_list([cmd, 'list', s, '--nodes'])
354  self.assertEquals('/add_two_ints /a2iserver', b.getvalue().strip())
355 
356  # test with namespaces
357  s = '/foo'
358  rosservice._rosservice_cmd_list([cmd, 'list', s])
359  rosservice._rosservice_cmd_list([cmd, 'list', s, '-n'])
360  rosservice._rosservice_cmd_list([cmd, 'list', s, '--nodes'])
361  s = 'foo'
362  rosservice._rosservice_cmd_list([cmd, 'list', s])
363  rosservice._rosservice_cmd_list([cmd, 'list', s, '-n'])
364  rosservice._rosservice_cmd_list([cmd, 'list', s, '--nodes'])
365 
366 
367 NAME = 'test_rosservice'
368 if __name__ == '__main__':
369  rosunit.unitrun('test_rosservice', NAME, TestRosservice, sys.argv, coverage_packages=['rosservice'])


test_rosservice
Author(s): Ken Conley, Dirk Thomas
autogenerated on Mon Nov 2 2020 03:52:57