check_rosservice_command_line_online.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Software License Agreement (BSD License)
3 #
4 # Copyright (c) 2009, Willow Garage, Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 #
11 # * Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following
15 # disclaimer in the documentation and/or other materials provided
16 # with the distribution.
17 # * Neither the name of Willow Garage, Inc. nor the names of its
18 # contributors may be used to endorse or promote products derived
19 # from this software without specific prior written permission.
20 #
21 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 # POSSIBILITY OF SUCH DAMAGE.
33 
34 import os
35 import sys
36 import time
37 import unittest
38 
39 import rospy
40 import rostest
41 
42 import test_rosmaster.srv
43 
44 from subprocess import Popen, PIPE, check_call, call
45 
46 from threading import Thread
47 class TestTask(Thread):
48  def __init__(self, task):
49  Thread.__init__(self)
50  self.task = task
51  self.success = False
52  self.done = False
53  self.value = None
54 
55  def run(self):
56  try:
57  print("STARTING TASK")
58  self.value = self.task()
59  print("TASK HAS COMPLETED")
60  self.success = True
61  except:
62  import traceback
63  traceback.print_exc()
64  self.done = True
65 
66 class TestRosserviceOnline(unittest.TestCase):
67 
68  def setUp(self):
69  pass
70 
71  def test_rosservice(self):
72  # wait for network to initialize
73  services = ['/add_two_ints', '/foo/add_two_ints', '/bar/add_two_ints']
74  for s in services:
75  rospy.wait_for_service(s)
76 
77  cmd = 'rosservice'
78  names = ['add_two_ints', '/add_two_ints', 'foo/add_two_ints', '/bar/add_two_ints']
79 
80  # list
81  # - hard to exact match as we are still adding builtin services to nodes (e.g. set_logger_level)
82  output = Popen([cmd, 'list'], stdout=PIPE).communicate()[0]
83  output = output.decode()
84  l = set(output.split())
85  for s in services:
86  self.assert_(s in l)
87 
88  for name in names:
89  # args
90  output = Popen([cmd, 'args', name], stdout=PIPE).communicate()[0]
91  self.assertEquals(b'a b', output.strip())
92 
93  # type
94  output = Popen([cmd, 'type', name], stdout=PIPE).communicate()[0]
95  self.assertEquals(b'test_rosmaster/AddTwoInts', output.strip())
96 
97  # find
98  output = Popen([cmd, 'find', 'test_rosmaster/AddTwoInts'], stdout=PIPE).communicate()[0]
99  values = [v.strip() for v in output.decode().split('\n') if v.strip()]
100  self.assertEquals(set(values), set(services))
101 
102  # uri
103  output = Popen([cmd, 'uri', name], stdout=PIPE).communicate()[0]
104  # - no exact answer
105  self.assert_(output.decode().startswith('rosrpc://'), output)
106 
107  # call
108  output = Popen([cmd, 'call', '--wait', name, '1', '2'], stdout=PIPE).communicate()[0]
109  self.assertEquals(b'sum: 3', output.strip())
110 
111  output = Popen([cmd, 'call', name, '1', '2'], stdout=PIPE).communicate()[0]
112  self.assertEquals(b'sum: 3', output.strip())
113 
114  name = 'header_echo'
115  # test with a Header so we can validate keyword args
116  import yaml
117  import time
118  t = time.time()
119 
120  # test with empty headers
121  for v in ['{}', '{header: {}}', '{header: {seq: 0}}']:
122  output = Popen([cmd, 'call', name, v], stdout=PIPE).communicate()[0]
123  output = output.strip()
124  self.assert_(output, output)
125  val = yaml.load(output)['header']
126  self.assertEquals('', val['frame_id'])
127  self.assert_(val['seq'] >= 0)
128  self.assertEquals(0, val['stamp']['secs'])
129  self.assertEquals(0, val['stamp']['nsecs'])
130 
131  # test with auto headers
132  for v in ['{header: auto}', '{header: {stamp: now}}']:
133  output = Popen([cmd, 'call', name, v], stdout=PIPE).communicate()[0]
134  val = yaml.load(output.strip())['header']
135  self.assertEquals('', val['frame_id'])
136  self.assert_(val['seq'] >= 0)
137  self.assert_(val['stamp']['secs'] >= int(t))
138 
139 
140  # verify that it respects ROS_NS
141  # - the uris should be different as the names should resolve differently
142  env = os.environ.copy()
143  env['ROS_NAMESPACE'] = 'foo'
144  uri1 = Popen([cmd, 'uri', 'add_two_ints'], stdout=PIPE).communicate()[0]
145  uri2 = Popen([cmd, 'uri', 'add_two_ints'], env=env, stdout=PIPE).communicate()[0]
146  self.assert_(uri2.decode().startswith('rosrpc://'))
147  self.assertNotEquals(uri1, uri2)
148 
149  # test_call_wait
150  def task1():
151  output = Popen([cmd, 'call', '--wait', 'wait_two_ints', '1', '2'], stdout=PIPE).communicate()[0]
152  self.assertEquals(b'sum: 3', output.strip())
153  timeout_t = time.time() + 5.
154  t1 = TestTask(task1)
155  t1.start()
156 
157  rospy.init_node('test_call_wait')
158  rospy.Service("wait_two_ints", test_rosmaster.srv.AddTwoInts, lambda x: x.a + x.b)
159  while not t1.done and time.time() < timeout_t:
160  time.sleep(0.5)
161  self.assert_(t1.success)
162 
163 
164 PKG = 'test_rosservice'
165 NAME = 'test_rosservice_command_line_online'
166 if __name__ == '__main__':
167  rostest.run(PKG, NAME, TestRosserviceOnline, sys.argv)


test_rosservice
Author(s): Ken Conley
autogenerated on Sun Feb 3 2019 03:30:23