$search
00001 #!/usr/bin/env python 00002 # Software License Agreement (BSD License) 00003 # 00004 # Copyright (c) 2009, Willow Garage, Inc. 00005 # All rights reserved. 00006 # 00007 # Redistribution and use in source and binary forms, with or without 00008 # modification, are permitted provided that the following conditions 00009 # are met: 00010 # 00011 # * Redistributions of source code must retain the above copyright 00012 # notice, this list of conditions and the following disclaimer. 00013 # * Redistributions in binary form must reproduce the above 00014 # copyright notice, this list of conditions and the following 00015 # disclaimer in the documentation and/or other materials provided 00016 # with the distribution. 00017 # * Neither the name of Willow Garage, Inc. nor the names of its 00018 # contributors may be used to endorse or promote products derived 00019 # from this software without specific prior written permission. 00020 # 00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00032 # POSSIBILITY OF SUCH DAMAGE. 00033 # 00034 # Revision $Id$ 00035 00036 PKG = 'test_roslaunch' 00037 NAME = 'test_nodeprocess' 00038 00039 import roslib; roslib.load_manifest(PKG) 00040 00041 import os 00042 import sys 00043 import unittest 00044 00045 ## Test roslaunch.nodeprocess 00046 class TestNodeprocess(unittest.TestCase): 00047 00048 def test_create_master_process(self): 00049 from roslaunch.core import Node, Machine, Master, RLException 00050 from roslaunch.nodeprocess import create_master_process, LocalProcess 00051 00052 ros_root = '/ros/root' 00053 port = 1234 00054 type = Master.ROSMASTER 00055 run_id = 'foo' 00056 00057 # test invalid params 00058 try: 00059 create_master_process(run_id, type, ros_root, -1) 00060 self.fail("shoud have thrown RLException") 00061 except RLException: pass 00062 try: 00063 create_master_process(run_id, type, ros_root, 10000000) 00064 self.fail("shoud have thrown RLException") 00065 except RLException: pass 00066 try: 00067 create_master_process(run_id, 'foo', ros_root, port) 00068 self.fail("shoud have thrown RLException") 00069 except RLException: pass 00070 00071 # test valid params 00072 p = create_master_process(run_id, type, ros_root, port) 00073 self.assert_(isinstance(p, LocalProcess)) 00074 self.assertEquals(p.args[0], os.path.join(ros_root, 'bin', 'rosmaster')) 00075 idx = p.args.index('-p') 00076 self.failIf(idx < 1) 00077 self.assertEquals(p.args[idx+1], str(port)) 00078 self.assert_('--core' in p.args) 00079 00080 self.assertEquals(p.package, 'rosmaster') 00081 p = create_master_process(run_id, type, ros_root, port) 00082 00083 # TODO: have to think more as to the correct environment for the master process 00084 00085 00086 def test_create_node_process(self): 00087 from roslaunch.core import Node, Machine, RLException 00088 from roslaunch.node_args import NodeParamsException 00089 from roslaunch.nodeprocess import create_node_process, LocalProcess 00090 # have to use real ROS configuration for these tests 00091 ros_root = os.environ['ROS_ROOT'] 00092 rpp = os.environ.get('ROS_PACKAGE_PATH', None) 00093 master_uri = 'http://masteruri:1234' 00094 m = Machine('name1', ros_root, rpp, '1.2.3.4') 00095 00096 run_id = 'id' 00097 00098 # test invalid params 00099 n = Node('not_a_real_package','not_a_node') 00100 n.machine = m 00101 n.name = 'foo' 00102 try: # should fail b/c node cannot be found 00103 create_node_process(run_id, n, master_uri) 00104 self.fail("should have failed") 00105 except NodeParamsException: 00106 pass 00107 00108 # have to specify a real node 00109 n = Node('test_ros','talker.py') 00110 n.machine = m 00111 try: # should fail b/c n.name is not set 00112 create_node_process(run_id, n, master_uri) 00113 self.fail("should have failed") 00114 except ValueError: 00115 pass 00116 00117 # have to specify a real node 00118 n = Node('test_ros','talker.py') 00119 00120 n.machine = None 00121 n.name = 'talker' 00122 try: # should fail b/c n.machine is not set 00123 create_node_process(run_id, n, master_uri) 00124 self.fail("should have failed") 00125 except RLException: 00126 pass 00127 00128 # basic integration test 00129 n.machine = m 00130 p = create_node_process(run_id, n, master_uri) 00131 self.assert_(isinstance(p, LocalProcess)) 00132 00133 # repeat some setup_local_process_env tests 00134 d = p.env 00135 self.assertEquals(d['ROS_MASTER_URI'], master_uri) 00136 self.assertEquals(d['ROS_ROOT'], ros_root) 00137 self.assertEquals(d['PYTHONPATH'], os.path.join(ros_root, 'core', 'roslib', 'src')) 00138 if rpp: 00139 self.assertEquals(d['ROS_PACKAGE_PATH'], rpp) 00140 for k in ['ROS_IP', 'ROS_NAMESPACE']: 00141 if k in d: 00142 self.fail('%s should not be set: %s'%(k,d[k])) 00143 00144 # test package and name 00145 self.assertEquals(p.package, 'test_ros') 00146 # - no 'correct' full answer here 00147 self.assert_(p.name.startswith('talker'), p.name) 00148 00149 # test log_output 00150 n.output = 'log' 00151 self.assert_(create_node_process(run_id, n, master_uri).log_output) 00152 n.output = 'screen' 00153 self.failIf(create_node_process(run_id, n, master_uri).log_output) 00154 00155 # test respawn 00156 n.respawn = True 00157 self.assert_(create_node_process(run_id, n, master_uri).respawn) 00158 n.respawn = False 00159 self.failIf(create_node_process(run_id, n, master_uri).respawn) 00160 00161 # test cwd 00162 n.cwd = None 00163 self.assertEquals(create_node_process(run_id, n, master_uri).cwd, None) 00164 n.cwd = 'ros-root' 00165 self.assertEquals(create_node_process(run_id, n, master_uri).cwd, 'ros-root') 00166 n.cwd = 'node' 00167 self.assertEquals(create_node_process(run_id, n, master_uri).cwd, 'node') 00168 00169 # test args 00170 00171 # - simplest test (no args) 00172 n.args = '' 00173 p = create_node_process(run_id, n, master_uri) 00174 # - the first arg should be the path to the node executable 00175 cmd = roslib.packages.find_node('test_ros', 'talker.py', ros_root, rpp) 00176 self.assertEquals(p.args[0], cmd) 00177 00178 # - test basic args 00179 n.args = "arg1 arg2 arg3" 00180 p = create_node_process(run_id, n, master_uri) 00181 self.assertEquals(p.args[0], cmd) 00182 for a in "arg1 arg2 arg3".split(): 00183 self.assert_(a in p.args) 00184 00185 # - test remap args 00186 n.remap_args = [('KEY1', 'VAL1'), ('KEY2', 'VAL2')] 00187 p = create_node_process(run_id, n, master_uri) 00188 self.assert_('KEY1:=VAL1' in p.args) 00189 self.assert_('KEY2:=VAL2' in p.args) 00190 00191 # - test __name 00192 n = Node('test_ros','talker.py') 00193 n.name = 'fooname' 00194 n.machine = m 00195 self.assert_('__name:=fooname' in create_node_process(run_id, n, master_uri).args) 00196 00197 # - test substitution args 00198 os.environ['SUB_TEST'] = 'subtest' 00199 os.environ['SUB_TEST2'] = 'subtest2' 00200 n.args = 'foo $(env SUB_TEST) $(env SUB_TEST2)' 00201 p = create_node_process(run_id, n, master_uri) 00202 self.failIf('SUB_TEST' in p.args) 00203 self.assert_('foo' in p.args) 00204 self.assert_('subtest' in p.args) 00205 self.assert_('subtest2' in p.args) 00206 00207 def test__cleanup_args(self): 00208 # #1595 00209 from roslaunch.nodeprocess import _cleanup_remappings 00210 args = [ 00211 'ROS_PACKAGE_PATH=foo', 00212 '/home/foo/monitor', 00213 '__log:=/home/pr2/21851/ros-0.7.2/log/8d769688-897d-11de-8e93-00238bdfe0ab/monitor-3.log', 00214 '__name:=bar', 00215 '__log:=/home/pr2/21851/ros-0.7.2/log/8d769688-897d-11de-8e93-00238bdfe0ab/monitor-3.log', 00216 'topic:=topic2', 00217 '__log:=/home/pr2/21851/ros-0.7.2/log/8d769688-897d-11de-8e93-00238bdfe0ab/monitor-3.log', 00218 '__log:=/home/pr2/21851/ros-0.7.2/log/8d769688-897d-11de-8e93-00238bdfe0ab/monitor-3.log', 00219 '__log:=/home/pr2/21851/ros-0.7.2/log/8d769688-897d-11de-8e93-00238bdfe0ab/monitor-3.log', 00220 '__log:=/home/pr2/21851/ros-0.7.2/log/8d769688-897d-11de-8e93-00238bdfe0ab/monitor-3.log', 00221 '__log:=/home/pr2/21851/ros-0.7.2/log/8d769688-897d-11de-8e93-00238bdfe0ab/monitor-3.log'] 00222 clean_args = [ 00223 'ROS_PACKAGE_PATH=foo', 00224 '/home/foo/monitor', 00225 '__name:=bar', 00226 'topic:=topic2'] 00227 00228 self.assertEquals([], _cleanup_remappings([], '__log:=')) 00229 self.assertEquals(clean_args, _cleanup_remappings(args, '__log:=')) 00230 self.assertEquals(clean_args, _cleanup_remappings(clean_args, '__log:=')) 00231 self.assertEquals(args, _cleanup_remappings(args, '_foo')) 00232 00233 def test__next_counter(self): 00234 from roslaunch.nodeprocess import _next_counter 00235 x = _next_counter() 00236 y = _next_counter() 00237 self.assert_(x +1 == y) 00238 self.assert_(x > 0) 00239 00240 def test_create_master_process2(self): 00241 # accidentally wrote two versions of this, need to merge 00242 from roslaunch.core import Master, RLException 00243 import roslib.rosenv 00244 from roslaunch.nodeprocess import create_master_process 00245 00246 ros_root = roslib.rosenv.get_ros_root() 00247 00248 # test failures 00249 failed = False 00250 try: 00251 create_master_process('runid-unittest', Master.ROSMASTER, roslib.rosenv.get_ros_root(), 0) 00252 failed = True 00253 except RLException: pass 00254 self.failIf(failed, "invalid port should have triggered error") 00255 00256 # test success with ROSMASTER 00257 m1 = create_master_process('runid-unittest', Master.ROSMASTER, ros_root, 1234) 00258 self.assertEquals('runid-unittest', m1.run_id) 00259 self.failIf(m1.started) 00260 self.failIf(m1.stopped) 00261 self.assertEquals(None, m1.cwd) 00262 self.assertEquals('master', m1.name) 00263 master_p = os.path.join(ros_root, 'bin', 'rosmaster') 00264 self.assert_(master_p in m1.args) 00265 # - it should have the default environment 00266 self.assertEquals(os.environ, m1.env) 00267 # - check args 00268 self.assert_('--core' in m1.args) 00269 # - make sure port arguent is correct 00270 idx = m1.args.index('-p') 00271 self.assertEquals('1234', m1.args[idx+1]) 00272 00273 # test port argument 00274 m2 = create_master_process('runid-unittest', Master.ROSMASTER, ros_root, 1234) 00275 self.assertEquals('runid-unittest', m2.run_id) 00276 00277 # test ros_root argument 00278 m3 = create_master_process('runid-unittest', Master.ROSMASTER, ros_root, 1234) 00279 self.assertEquals('runid-unittest', m3.run_id) 00280 master_p = os.path.join(ros_root, 'bin', 'rosmaster') 00281 self.assert_(master_p in m3.args) 00282 00283 if __name__ == '__main__': 00284 import rostest 00285 rostest.unitrun('test_roslaunch', NAME, TestNodeprocess, coverage_packages=['roslaunch.nodeprocess']) 00286