00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 PKG = 'test_roslaunch'
00037 NAME = 'test_nodeprocess'
00038
00039 import roslib; roslib.load_manifest(PKG)
00040
00041 import os
00042 import sys
00043 import unittest
00044
00045
00046 class TestNodeprocess(unittest.TestCase):
00047
00048 def test_create_master_process(self):
00049 from roslaunch.core import Node, Machine, Master, RLException
00050 from roslaunch.nodeprocess import create_master_process, LocalProcess
00051
00052 ros_root = '/ros/root'
00053 port = 1234
00054 type = Master.ROSMASTER
00055 run_id = 'foo'
00056
00057
00058 try:
00059 create_master_process(run_id, type, ros_root, -1)
00060 self.fail("shoud have thrown RLException")
00061 except RLException: pass
00062 try:
00063 create_master_process(run_id, type, ros_root, 10000000)
00064 self.fail("shoud have thrown RLException")
00065 except RLException: pass
00066 try:
00067 create_master_process(run_id, 'foo', ros_root, port)
00068 self.fail("shoud have thrown RLException")
00069 except RLException: pass
00070
00071
00072 p = create_master_process(run_id, type, ros_root, port)
00073 self.assert_(isinstance(p, LocalProcess))
00074 self.assertEquals(p.args[0], os.path.join(ros_root, 'bin', 'rosmaster'))
00075 idx = p.args.index('-p')
00076 self.failIf(idx < 1)
00077 self.assertEquals(p.args[idx+1], str(port))
00078 self.assert_('--core' in p.args)
00079
00080 self.assertEquals(p.package, 'rosmaster')
00081 p = create_master_process(run_id, type, ros_root, port)
00082
00083
00084
00085
00086 def test_create_node_process(self):
00087 from roslaunch.core import Node, Machine, RLException
00088 from roslaunch.node_args import NodeParamsException
00089 from roslaunch.nodeprocess import create_node_process, LocalProcess
00090
00091 ros_root = os.environ['ROS_ROOT']
00092 rpp = os.environ.get('ROS_PACKAGE_PATH', None)
00093 master_uri = 'http://masteruri:1234'
00094 m = Machine('name1', ros_root, rpp, '1.2.3.4')
00095
00096 run_id = 'id'
00097
00098
00099 n = Node('not_a_real_package','not_a_node')
00100 n.machine = m
00101 n.name = 'foo'
00102 try:
00103 create_node_process(run_id, n, master_uri)
00104 self.fail("should have failed")
00105 except NodeParamsException:
00106 pass
00107
00108
00109 n = Node('test_ros','talker.py')
00110 n.machine = m
00111 try:
00112 create_node_process(run_id, n, master_uri)
00113 self.fail("should have failed")
00114 except ValueError:
00115 pass
00116
00117
00118 n = Node('test_ros','talker.py')
00119
00120 n.machine = None
00121 n.name = 'talker'
00122 try:
00123 create_node_process(run_id, n, master_uri)
00124 self.fail("should have failed")
00125 except RLException:
00126 pass
00127
00128
00129 n.machine = m
00130 p = create_node_process(run_id, n, master_uri)
00131 self.assert_(isinstance(p, LocalProcess))
00132
00133
00134 d = p.env
00135 self.assertEquals(d['ROS_MASTER_URI'], master_uri)
00136 self.assertEquals(d['ROS_ROOT'], ros_root)
00137 self.assertEquals(d['PYTHONPATH'], os.path.join(ros_root, 'core', 'roslib', 'src'))
00138 if rpp:
00139 self.assertEquals(d['ROS_PACKAGE_PATH'], rpp)
00140 for k in ['ROS_IP', 'ROS_NAMESPACE']:
00141 if k in d:
00142 self.fail('%s should not be set: %s'%(k,d[k]))
00143
00144
00145 self.assertEquals(p.package, 'test_ros')
00146
00147 self.assert_(p.name.startswith('talker'), p.name)
00148
00149
00150 n.output = 'log'
00151 self.assert_(create_node_process(run_id, n, master_uri).log_output)
00152 n.output = 'screen'
00153 self.failIf(create_node_process(run_id, n, master_uri).log_output)
00154
00155
00156 n.respawn = True
00157 self.assert_(create_node_process(run_id, n, master_uri).respawn)
00158 n.respawn = False
00159 self.failIf(create_node_process(run_id, n, master_uri).respawn)
00160
00161
00162 n.cwd = None
00163 self.assertEquals(create_node_process(run_id, n, master_uri).cwd, None)
00164 n.cwd = 'ros-root'
00165 self.assertEquals(create_node_process(run_id, n, master_uri).cwd, 'ros-root')
00166 n.cwd = 'node'
00167 self.assertEquals(create_node_process(run_id, n, master_uri).cwd, 'node')
00168
00169
00170
00171
00172 n.args = ''
00173 p = create_node_process(run_id, n, master_uri)
00174
00175 cmd = roslib.packages.find_node('test_ros', 'talker.py', ros_root, rpp)
00176 self.assertEquals(p.args[0], cmd)
00177
00178
00179 n.args = "arg1 arg2 arg3"
00180 p = create_node_process(run_id, n, master_uri)
00181 self.assertEquals(p.args[0], cmd)
00182 for a in "arg1 arg2 arg3".split():
00183 self.assert_(a in p.args)
00184
00185
00186 n.remap_args = [('KEY1', 'VAL1'), ('KEY2', 'VAL2')]
00187 p = create_node_process(run_id, n, master_uri)
00188 self.assert_('KEY1:=VAL1' in p.args)
00189 self.assert_('KEY2:=VAL2' in p.args)
00190
00191
00192 n = Node('test_ros','talker.py')
00193 n.name = 'fooname'
00194 n.machine = m
00195 self.assert_('__name:=fooname' in create_node_process(run_id, n, master_uri).args)
00196
00197
00198 os.environ['SUB_TEST'] = 'subtest'
00199 os.environ['SUB_TEST2'] = 'subtest2'
00200 n.args = 'foo $(env SUB_TEST) $(env SUB_TEST2)'
00201 p = create_node_process(run_id, n, master_uri)
00202 self.failIf('SUB_TEST' in p.args)
00203 self.assert_('foo' in p.args)
00204 self.assert_('subtest' in p.args)
00205 self.assert_('subtest2' in p.args)
00206
00207 def test__cleanup_args(self):
00208
00209 from roslaunch.nodeprocess import _cleanup_remappings
00210 args = [
00211 'ROS_PACKAGE_PATH=foo',
00212 '/home/foo/monitor',
00213 '__log:=/home/pr2/21851/ros-0.7.2/log/8d769688-897d-11de-8e93-00238bdfe0ab/monitor-3.log',
00214 '__name:=bar',
00215 '__log:=/home/pr2/21851/ros-0.7.2/log/8d769688-897d-11de-8e93-00238bdfe0ab/monitor-3.log',
00216 'topic:=topic2',
00217 '__log:=/home/pr2/21851/ros-0.7.2/log/8d769688-897d-11de-8e93-00238bdfe0ab/monitor-3.log',
00218 '__log:=/home/pr2/21851/ros-0.7.2/log/8d769688-897d-11de-8e93-00238bdfe0ab/monitor-3.log',
00219 '__log:=/home/pr2/21851/ros-0.7.2/log/8d769688-897d-11de-8e93-00238bdfe0ab/monitor-3.log',
00220 '__log:=/home/pr2/21851/ros-0.7.2/log/8d769688-897d-11de-8e93-00238bdfe0ab/monitor-3.log',
00221 '__log:=/home/pr2/21851/ros-0.7.2/log/8d769688-897d-11de-8e93-00238bdfe0ab/monitor-3.log']
00222 clean_args = [
00223 'ROS_PACKAGE_PATH=foo',
00224 '/home/foo/monitor',
00225 '__name:=bar',
00226 'topic:=topic2']
00227
00228 self.assertEquals([], _cleanup_remappings([], '__log:='))
00229 self.assertEquals(clean_args, _cleanup_remappings(args, '__log:='))
00230 self.assertEquals(clean_args, _cleanup_remappings(clean_args, '__log:='))
00231 self.assertEquals(args, _cleanup_remappings(args, '_foo'))
00232
00233 def test__next_counter(self):
00234 from roslaunch.nodeprocess import _next_counter
00235 x = _next_counter()
00236 y = _next_counter()
00237 self.assert_(x +1 == y)
00238 self.assert_(x > 0)
00239
00240 def test_create_master_process2(self):
00241
00242 from roslaunch.core import Master, RLException
00243 import roslib.rosenv
00244 from roslaunch.nodeprocess import create_master_process
00245
00246 ros_root = roslib.rosenv.get_ros_root()
00247
00248
00249 failed = False
00250 try:
00251 create_master_process('runid-unittest', Master.ROSMASTER, roslib.rosenv.get_ros_root(), 0)
00252 failed = True
00253 except RLException: pass
00254 self.failIf(failed, "invalid port should have triggered error")
00255
00256
00257 m1 = create_master_process('runid-unittest', Master.ROSMASTER, ros_root, 1234)
00258 self.assertEquals('runid-unittest', m1.run_id)
00259 self.failIf(m1.started)
00260 self.failIf(m1.stopped)
00261 self.assertEquals(None, m1.cwd)
00262 self.assertEquals('master', m1.name)
00263 master_p = os.path.join(ros_root, 'bin', 'rosmaster')
00264 self.assert_(master_p in m1.args)
00265
00266 self.assertEquals(os.environ, m1.env)
00267
00268 self.assert_('--core' in m1.args)
00269
00270 idx = m1.args.index('-p')
00271 self.assertEquals('1234', m1.args[idx+1])
00272
00273
00274 m2 = create_master_process('runid-unittest', Master.ROSMASTER, ros_root, 1234)
00275 self.assertEquals('runid-unittest', m2.run_id)
00276
00277
00278 m3 = create_master_process('runid-unittest', Master.ROSMASTER, ros_root, 1234)
00279 self.assertEquals('runid-unittest', m3.run_id)
00280 master_p = os.path.join(ros_root, 'bin', 'rosmaster')
00281 self.assert_(master_p in m3.args)
00282
00283 if __name__ == '__main__':
00284 import rostest
00285 rostest.unitrun('test_roslaunch', NAME, TestNodeprocess, coverage_packages=['roslaunch.nodeprocess'])
00286