42 import test_rosmaster.srv
44 from subprocess
import Popen, PIPE, check_call, call
46 from threading
import Thread
57 print(
"STARTING TASK")
59 print(
"TASK HAS COMPLETED")
73 services = [
'/add_two_ints',
'/foo/add_two_ints',
'/bar/add_two_ints']
75 rospy.wait_for_service(s)
78 names = [
'add_two_ints',
'/add_two_ints',
'foo/add_two_ints',
'/bar/add_two_ints']
82 output = Popen([cmd,
'list'], stdout=PIPE).communicate()[0]
83 output = output.decode()
84 l = set(output.split())
90 output = Popen([cmd,
'args', name], stdout=PIPE).communicate()[0]
91 self.assertEquals(b
'a b', output.strip())
94 output = Popen([cmd,
'type', name], stdout=PIPE).communicate()[0]
95 self.assertEquals(b
'test_rosmaster/AddTwoInts', output.strip())
98 output = Popen([cmd,
'find',
'test_rosmaster/AddTwoInts'], stdout=PIPE).communicate()[0]
99 values = [v.strip()
for v
in output.decode().split(
'\n')
if v.strip()]
100 self.assertEquals(set(values), set(services))
103 output = Popen([cmd,
'uri', name], stdout=PIPE).communicate()[0]
105 self.assert_(output.decode().startswith(
'rosrpc://'), output)
108 output = Popen([cmd,
'call',
'--wait', name,
'1',
'2'], stdout=PIPE).communicate()[0]
109 self.assertEquals(b
'sum: 3', output.strip())
111 output = Popen([cmd,
'call', name,
'1',
'2'], stdout=PIPE).communicate()[0]
112 self.assertEquals(b
'sum: 3', output.strip())
121 for v
in [
'{}',
'{header: {}}',
'{header: {seq: 0}}']:
122 output = Popen([cmd,
'call', name, v], stdout=PIPE).communicate()[0]
123 output = output.strip()
124 self.assert_(output, output)
125 val = yaml.load(output)[
'header']
126 self.assertEquals(
'', val[
'frame_id'])
127 self.assert_(val[
'seq'] >= 0)
128 self.assertEquals(0, val[
'stamp'][
'secs'])
129 self.assertEquals(0, val[
'stamp'][
'nsecs'])
132 for v
in [
'{header: auto}',
'{header: {stamp: now}}']:
133 output = Popen([cmd,
'call', name, v], stdout=PIPE).communicate()[0]
134 val = yaml.load(output.strip())[
'header']
135 self.assertEquals(
'', val[
'frame_id'])
136 self.assert_(val[
'seq'] >= 0)
137 self.assert_(val[
'stamp'][
'secs'] >= int(t))
142 env = os.environ.copy()
143 env[
'ROS_NAMESPACE'] =
'foo' 144 uri1 = Popen([cmd,
'uri',
'add_two_ints'], stdout=PIPE).communicate()[0]
145 uri2 = Popen([cmd,
'uri',
'add_two_ints'], env=env, stdout=PIPE).communicate()[0]
146 self.assert_(uri2.decode().startswith(
'rosrpc://'))
147 self.assertNotEquals(uri1, uri2)
151 output = Popen([cmd,
'call',
'--wait',
'wait_two_ints',
'1',
'2'], stdout=PIPE).communicate()[0]
152 self.assertEquals(b
'sum: 3', output.strip())
153 timeout_t = time.time() + 5.
157 rospy.init_node(
'test_call_wait')
158 rospy.Service(
"wait_two_ints", test_rosmaster.srv.AddTwoInts,
lambda x: x.a + x.b)
159 while not t1.done
and time.time() < timeout_t:
161 self.assert_(t1.success)
164 PKG =
'test_rosservice' 165 NAME =
'test_rosservice_command_line_online' 166 if __name__ ==
'__main__':
167 rostest.run(PKG, NAME, TestRosserviceOnline, sys.argv)
def test_rosservice(self)