00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 PKG = 'test_rosservice'
00037 NAME = 'test_rosservice_command_line_online'
00038 import roslib; roslib.load_manifest(PKG)
00039
00040 import os
00041 import sys
00042 import time
00043 import unittest
00044
00045 import rospy
00046 import rostest
00047
00048 import test_ros.srv
00049
00050 from subprocess import Popen, PIPE, check_call, call
00051
00052 from threading import Thread
00053 class TestTask(Thread):
00054 def __init__(self, task):
00055 Thread.__init__(self)
00056 self.task = task
00057 self.success = False
00058 self.done = False
00059 self.value = None
00060
00061 def run(self):
00062 try:
00063 print "STARTING TASK"
00064 self.value = self.task()
00065 print "TASK HAS COMPLETED"
00066 self.success = True
00067 except:
00068 import traceback
00069 traceback.print_exc()
00070 self.done = True
00071
00072 class TestRosserviceOnline(unittest.TestCase):
00073
00074 def setUp(self):
00075 pass
00076
00077 def test_rosservice(self):
00078
00079 services = ['/add_two_ints', '/foo/add_two_ints', '/bar/add_two_ints']
00080 for s in services:
00081 rospy.wait_for_service(s)
00082
00083 cmd = 'rosservice'
00084 names = ['add_two_ints', '/add_two_ints', 'foo/add_two_ints', '/bar/add_two_ints']
00085
00086
00087
00088 output = Popen([cmd, 'list'], stdout=PIPE).communicate()[0]
00089 l = set(output.split())
00090 for s in services:
00091 self.assert_(s in l)
00092
00093 for name in names:
00094
00095 output = Popen([cmd, 'args', name], stdout=PIPE).communicate()[0]
00096 self.assertEquals('a b', output.strip())
00097
00098
00099 output = Popen([cmd, 'type', name], stdout=PIPE).communicate()[0]
00100 self.assertEquals('test_ros/AddTwoInts', output.strip())
00101
00102
00103 output = Popen([cmd, 'find', 'test_ros/AddTwoInts'], stdout=PIPE).communicate()[0]
00104 values = [v.strip() for v in output.split('\n') if v.strip()]
00105 self.assertEquals(set(values), set(services))
00106
00107
00108 output = Popen([cmd, 'uri', name], stdout=PIPE).communicate()[0]
00109
00110 self.assert_(output.startswith('rosrpc://'), output)
00111
00112
00113 output = Popen([cmd, 'call', '--wait', name, '1', '2'], stdout=PIPE).communicate()[0]
00114 self.assertEquals('sum: 3', output.strip())
00115
00116 output = Popen([cmd, 'call', name, '1', '2'], stdout=PIPE).communicate()[0]
00117 self.assertEquals('sum: 3', output.strip())
00118
00119 name = 'header_echo'
00120
00121 import yaml
00122 import time
00123 t = time.time()
00124
00125
00126 for v in ['{}', '{header: {}}', '{header: {seq: 0}}']:
00127 output = Popen([cmd, 'call', name, v], stdout=PIPE).communicate()[0]
00128 val = yaml.load(output.strip())['header']
00129 self.assertEquals('', val['frame_id'])
00130 self.assert_(val['seq'] >= 0)
00131 self.assertEquals(0, val['stamp']['secs'])
00132 self.assertEquals(0, val['stamp']['nsecs'])
00133
00134
00135 for v in ['{header: auto}', '{header: {stamp: now}}']:
00136 output = Popen([cmd, 'call', name, v], stdout=PIPE).communicate()[0]
00137 val = yaml.load(output.strip())['header']
00138 self.assertEquals('', val['frame_id'])
00139 self.assert_(val['seq'] >= 0)
00140 self.assert_(val['stamp']['secs'] >= int(t))
00141
00142
00143
00144
00145 env = os.environ.copy()
00146 env['ROS_NAMESPACE'] = 'foo'
00147 uri1 = Popen([cmd, 'uri', 'add_two_ints'], stdout=PIPE).communicate()[0]
00148 uri2 = Popen([cmd, 'uri', 'add_two_ints'], env=env, stdout=PIPE).communicate()[0]
00149 self.assert_(uri2.startswith('rosrpc://'))
00150 self.assertNotEquals(uri1, uri2)
00151
00152
00153 def task1():
00154 output = Popen([cmd, 'call', '--wait', 'wait_two_ints', '1', '2'], stdout=PIPE).communicate()[0]
00155 self.assertEquals('sum: 3', output.strip())
00156 timeout_t = time.time() + 5.
00157 t1 = TestTask(task1)
00158 t1.start()
00159
00160 rospy.init_node('test_call_wait')
00161 rospy.Service("wait_two_ints", test_ros.srv.AddTwoInts, lambda x: x.a + x.b)
00162 while not t1.done and time.time() < timeout_t:
00163 time.sleep(0.5)
00164 self.assert_(t1.success)
00165
00166
00167 if __name__ == '__main__':
00168 rostest.run(PKG, NAME, TestRosserviceOnline, sys.argv)