$search
00001 #!/usr/bin/env python 00002 # Software License Agreement (BSD License) 00003 # 00004 # Copyright (c) 2008, Willow Garage, Inc. 00005 # All rights reserved. 00006 # 00007 # Redistribution and use in source and binary forms, with or without 00008 # modification, are permitted provided that the following conditions 00009 # are met: 00010 # 00011 # * Redistributions of source code must retain the above copyright 00012 # notice, this list of conditions and the following disclaimer. 00013 # * Redistributions in binary form must reproduce the above 00014 # copyright notice, this list of conditions and the following 00015 # disclaimer in the documentation and/or other materials provided 00016 # with the distribution. 00017 # * Neither the name of Willow Garage, Inc. nor the names of its 00018 # contributors may be used to endorse or promote products derived 00019 # from this software without specific prior written permission. 00020 # 00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00032 # POSSIBILITY OF SUCH DAMAGE. 00033 # 00034 # Revision $Id$ 00035 00036 PKG = 'test_roslaunch' 00037 NAME = 'test_core' 00038 00039 import roslib; roslib.load_manifest(PKG) 00040 00041 import os 00042 import sys 00043 import unittest 00044 00045 _lastmsg = None 00046 def printlog_cb(msg): 00047 global _lastmsg 00048 _lastmsg = msg 00049 def printlog_cb_exception(msg): 00050 raise Exception(msg) 00051 00052 ## Test roslaunch.core 00053 class TestCore(unittest.TestCase): 00054 def test_xml_escape(self): 00055 from roslaunch.core import _xml_escape 00056 self.assertEquals('', _xml_escape('')) 00057 self.assertEquals(' ', _xml_escape(' ')) 00058 self.assertEquals('"', _xml_escape('"')) 00059 self.assertEquals('"hello world"', _xml_escape('"hello world"')) 00060 self.assertEquals('>', _xml_escape('>')) 00061 self.assertEquals('<', _xml_escape('<')) 00062 self.assertEquals('&', _xml_escape('&')) 00063 self.assertEquals('&amp;', _xml_escape('&')) 00064 self.assertEquals('&"><"', _xml_escape('&"><"')) 00065 00066 00067 def test_child_mode(self): 00068 from roslaunch.core import is_child_mode, set_child_mode 00069 self.failIf(is_child_mode()) 00070 set_child_mode(True) 00071 self.assert_(is_child_mode()) 00072 set_child_mode(False) 00073 self.failIf(is_child_mode()) 00074 00075 def test_printlog(self): 00076 from roslaunch.core import add_printlog_handler, add_printerrlog_handler, printlog, printlog_bold, printerrlog 00077 add_printlog_handler(printlog_cb) 00078 add_printlog_handler(printlog_cb_exception) 00079 add_printerrlog_handler(printlog_cb) 00080 add_printerrlog_handler(printlog_cb_exception) 00081 00082 #can't really test functionality, just make sure it doesn't crash 00083 global _lastmsg 00084 _lastmsg = None 00085 printlog('foo') 00086 self.assertEquals('foo', _lastmsg) 00087 printlog_bold('bar') 00088 self.assertEquals('bar', _lastmsg) 00089 printerrlog('baz') 00090 self.assertEquals('baz', _lastmsg) 00091 00092 def test_setup_env(self): 00093 from roslaunch.core import setup_env, Node, Machine 00094 ros_root = '/ros/root1' 00095 rpp = '/rpp1' 00096 master_uri = 'http://masteruri:1234' 00097 00098 n = Node('nodepkg','nodetype') 00099 m = Machine('name1', ros_root, rpp, '1.2.3.4') 00100 d = setup_env(n, m, master_uri) 00101 self.assertEquals(d['ROS_MASTER_URI'], master_uri) 00102 self.assertEquals(d['ROS_ROOT'], ros_root) 00103 self.assertEquals(d['PYTHONPATH'], os.path.join(ros_root, 'core', 'roslib', 'src')) 00104 self.assertEquals(d['ROS_PACKAGE_PATH'], rpp) 00105 for k in ['ROS_IP', 'ROS_NAMESPACE']: 00106 if k in d: 00107 self.fail('%s should not be set: %s'%(k,d[k])) 00108 00109 # don't set ROS_ROOT and ROS_PACKAGE_PATH. ROS_ROOT should default, ROS_PACKAGE_PATH does not 00110 m = Machine('name1', '', '', '1.2.3.4') 00111 d = setup_env(n, m, master_uri) 00112 val = os.environ['ROS_ROOT'] 00113 self.assertEquals(d['ROS_ROOT'], val) 00114 self.assertEquals(d['PYTHONPATH'], os.path.join(val, 'core', 'roslib', 'src')) 00115 self.failIf('ROS_PACKAGE_PATH' in d, 'ROS_PACKAGE_PATH should not be set: %s'%d) 00116 00117 # test ROS_IP 00118 m = Machine('name1', ros_root, rpp, '1.2.3.4', ros_ip="4.5.6.7") 00119 n = Node('nodepkg','nodetype', namespace="/ns1") 00120 d = setup_env(n, m, master_uri) 00121 self.assertEquals(d['ROS_NAMESPACE'], "/ns1") 00122 # test stripping 00123 n = Node('nodepkg','nodetype', namespace="/ns2/") 00124 d = setup_env(n, m, master_uri) 00125 self.assertEquals(d['ROS_NAMESPACE'], "/ns2") 00126 00127 # test ROS_NAMESPACE 00128 m = Machine('name1', ros_root, rpp, '1.2.3.4', ros_ip="4.5.6.7") 00129 d = setup_env(n, m, master_uri) 00130 self.assertEquals(d['ROS_IP'], "4.5.6.7") 00131 00132 # test node.env_args 00133 n = Node('nodepkg','nodetype', env_args=[('NENV1', 'val1'), ('NENV2', 'val2'), ('ROS_ROOT', '/new/root')]) 00134 d = setup_env(n, m, master_uri) 00135 self.assertEquals(d['ROS_ROOT'], "/new/root") 00136 self.assertEquals(d['NENV1'], "val1") 00137 self.assertEquals(d['NENV2'], "val2") 00138 00139 # test machine.env_args 00140 m = Machine('name1', ros_root, rpp, '1.2.3.4', ros_ip="4.5.6.7", env_args=[('MENV1', 'val1'), ('MENV2', 'val2'), ('ROS_ROOT', '/new/root2')]) 00141 n = Node('nodepkg','nodetype') 00142 d = setup_env(n, m, master_uri) 00143 self.assertEquals(d['ROS_ROOT'], "/new/root2") 00144 self.assertEquals(d['MENV1'], "val1") 00145 self.assertEquals(d['MENV2'], "val2") 00146 00147 # test node.env_args precedence 00148 m = Machine('name1', ros_root, rpp, '1.2.3.4', ros_ip="4.5.6.7", env_args=[('MENV1', 'val1'), ('MENV2', 'val2')]) 00149 n = Node('nodepkg','nodetype', env_args=[('MENV1', 'nodeval1')]) 00150 d = setup_env(n, m, master_uri) 00151 self.assertEquals(d['MENV1'], "nodeval1") 00152 self.assertEquals(d['MENV2'], "val2") 00153 00154 00155 00156 def test_Machine(self): 00157 from roslaunch.core import Machine 00158 m = Machine('name1', '/ros/root1', '/rpp1', '1.2.3.4') 00159 str(m), m.config_key() #test for error 00160 self.assertEquals(m, m) 00161 self.assertEquals(m, Machine('name1', '/ros/root1', '/rpp1', '1.2.3.4')) 00162 # verify that config_equals is not tied to name or assignable, but is tied to other properties 00163 self.assert_(m.config_equals(Machine('name1b', '/ros/root1', '/rpp1', '1.2.3.4'))) 00164 self.assert_(m.config_equals(Machine('name1c', '/ros/root1', '/rpp1', '1.2.3.4', assignable=False))) 00165 self.failIf(m.config_equals(Machine('name1d', '/ros/root2', '/rpp1', '1.2.3.4'))) 00166 self.failIf(m.config_equals(Machine('name1e', '/ros/root1', '/rpp2', '1.2.3.4'))) 00167 self.failIf(m.config_equals(Machine('name1f', '/ros/root1', '/rpp1', '2.2.3.4'))) 00168 00169 self.assertEquals(m.name, 'name1') 00170 self.assertEquals(m.ros_root, '/ros/root1') 00171 self.assertEquals(m.ros_package_path, '/rpp1') 00172 self.assertEquals(m.address, '1.2.3.4') 00173 self.assertEquals(m.assignable, True) 00174 self.assertEquals(m.ssh_port, 22) 00175 self.assertEquals(m.env_args, []) 00176 for p in ['ros_ip', 'user', 'password']: 00177 self.assertEquals(getattr(m, p), None) 00178 00179 m = Machine('name2', '/ros/root2', '/rpp2', '2.2.3.4', assignable=False) 00180 self.assertEquals(m.assignable, False) 00181 str(m), m.config_key() #test for error 00182 self.assertEquals(m, m) 00183 self.assertEquals(m, Machine('name2', '/ros/root2', '/rpp2', '2.2.3.4', assignable=False)) 00184 self.assert_(m.config_equals(Machine('name2b', '/ros/root2', '/rpp2', '2.2.3.4', assignable=False))) 00185 self.assert_(m.config_equals(Machine('name2c', '/ros/root2', '/rpp2', '2.2.3.4', assignable=True))) 00186 00187 m = Machine('name3', '/ros/root3', '/rpp3', '3.3.3.4', ros_ip='4.5.6.7') 00188 self.assertEquals(m.ros_ip, '4.5.6.7') 00189 str(m) #test for error 00190 self.assertEquals(m, m) 00191 self.assertEquals(m, Machine('name3', '/ros/root3', '/rpp3', '3.3.3.4', ros_ip='4.5.6.7')) 00192 self.assert_(m.config_equals(Machine('name3b', '/ros/root3', '/rpp3', '3.3.3.4', ros_ip='4.5.6.7'))) 00193 00194 m = Machine('name4', '/ros/root4', '/rpp4', '4.4.3.4', user='user4') 00195 self.assertEquals(m.user, 'user4') 00196 str(m), m.config_key() #test for error 00197 self.assertEquals(m, m) 00198 self.assertEquals(m, Machine('name4', '/ros/root4', '/rpp4', '4.4.3.4', user='user4')) 00199 self.assert_(m.config_equals(Machine('name4b', '/ros/root4', '/rpp4', '4.4.3.4', user='user4'))) 00200 self.failIf(m.config_equals(Machine('name4b', '/ros/root4', '/rpp4', '4.4.3.4', user='user4b'))) 00201 00202 m = Machine('name5', '/ros/root5', '/rpp5', '5.5.3.4', password='password5') 00203 self.assertEquals(m.password, 'password5') 00204 str(m), m.config_key() #test for error 00205 self.assertEquals(m, m) 00206 self.assertEquals(m, Machine('name5', '/ros/root5', '/rpp5', '5.5.3.4', password='password5')) 00207 self.assert_(m.config_equals(Machine('name5b', '/ros/root5', '/rpp5', '5.5.3.4', password='password5'))) 00208 self.failIf(m.config_equals(Machine('name5c', '/ros/root5', '/rpp5', '5.5.3.4', password='password5c'))) 00209 00210 m = Machine('name6', '/ros/root6', '/rpp6', '6.6.3.4', ssh_port=23) 00211 self.assertEquals(m.ssh_port, 23) 00212 str(m) #test for error 00213 m.config_key() 00214 self.assertEquals(m, m) 00215 self.assertEquals(m, Machine('name6', '/ros/root6', '/rpp6', '6.6.3.4', ssh_port=23)) 00216 self.assert_(m.config_equals(Machine('name6b', '/ros/root6', '/rpp6', '6.6.3.4', ssh_port=23))) 00217 self.failIf(m.config_equals(Machine('name6c', '/ros/root6', '/rpp6', '6.6.3.4', ssh_port=24))) 00218 00219 m = Machine('name7', '/ros/root7', '/rpp7', '7.7.3.4', env_args=[('ARG', 'val'), ('ARG2', 'val')]) 00220 self.assertEquals(m.env_args, [('ARG', 'val'), ('ARG2', 'val')]) 00221 str(m), m.config_key() #test for error 00222 self.assertEquals(m, m) 00223 self.assertEquals(m, Machine('name7', '/ros/root7', '/rpp7', '7.7.3.4', env_args=[('ARG', 'val'), ('ARG2', 'val')])) 00224 self.assert_(m.config_equals(Machine('name7b', '/ros/root7', '/rpp7', '7.7.3.4', env_args=[('ARG', 'val'), ('ARG2', 'val')]))) 00225 self.assert_(m.config_equals(Machine('name7b', '/ros/root7', '/rpp7', '7.7.3.4', env_args=[('ARG2', 'val'), ('ARG', 'val')]))) 00226 self.failIf(m.config_equals(Machine('name7c', '/ros/root7', '/rpp7', '7.7.3.4', env_args=[('ARGc', 'valc'), ('ARG2', 'val')]))) 00227 00228 def test_local_machine(self): 00229 try: 00230 env_copy = os.environ.copy() 00231 if 'ROS_IP' in os.environ: 00232 del os.environ['ROS_IP'] 00233 if 'ROS_HOSTNAME' in os.environ: 00234 del os.environ['ROS_HOSTNAME'] 00235 00236 from roslaunch.core import local_machine 00237 lm = local_machine() 00238 self.failIf(lm is None) 00239 #singleton 00240 self.assert_(lm == local_machine()) 00241 00242 #verify properties 00243 self.assertEquals(lm.ros_root, roslib.rosenv.get_ros_root()) 00244 self.assertEquals(lm.ros_package_path, roslib.rosenv.get_ros_package_path()) 00245 00246 # #1051 important test: this caused regressions up the tree 00247 self.assertEquals(lm.ros_ip, None) 00248 00249 self.assertEquals(lm.name, '') 00250 self.assertEquals(lm.assignable, True) 00251 self.assertEquals(lm.env_args, []) 00252 self.assertEquals(lm.user, None) 00253 self.assertEquals(lm.password, None) 00254 finally: 00255 os.environ = env_copy 00256 00257 00258 def test_Master(self): 00259 from roslaunch.core import Master 00260 old_env = os.environ.get('ROS_MASTER_URI', None) 00261 try: 00262 os.environ['ROS_MASTER_URI'] = 'http://foo:789' 00263 m = Master() 00264 self.assertEquals(m.type, Master.ROSMASTER) 00265 self.assertEquals(m.uri, 'http://foo:789') 00266 self.assertEquals(m, m) 00267 self.assertEquals(m, Master()) 00268 00269 m = Master(Master.ROSMASTER, 'http://foo:1234') 00270 self.assertEquals(m.type, Master.ROSMASTER) 00271 self.assertEquals(m.uri, 'http://foo:1234') 00272 self.assertEquals(m, m) 00273 self.assertEquals(m, Master(Master.ROSMASTER, 'http://foo:1234')) 00274 00275 import xmlrpclib 00276 self.assert_(isinstance(m.get(), xmlrpclib.ServerProxy)) 00277 m.uri = 'http://foo:567' 00278 self.assertEquals(567, m.get_port()) 00279 self.failIf(m.is_running()) 00280 00281 finally: 00282 if old_env is None: 00283 del os.environ['ROS_MASTER_URI'] 00284 else: 00285 os.environ['ROS_MASTER_URI'] = old_env 00286 00287 def test_Executable(self): 00288 from roslaunch.core import Executable, PHASE_SETUP, PHASE_RUN, PHASE_TEARDOWN 00289 00290 e = Executable('ls', ['-alF']) 00291 self.assertEquals('ls', e.command) 00292 self.assertEquals(['-alF'], e.args) 00293 self.assertEquals(PHASE_RUN, e.phase) 00294 self.assertEquals('ls -alF', str(e)) 00295 self.assertEquals('ls -alF', repr(e)) 00296 00297 e = Executable('ls', ['-alF', 'b*'], PHASE_TEARDOWN) 00298 self.assertEquals('ls', e.command) 00299 self.assertEquals(['-alF', 'b*'], e.args) 00300 self.assertEquals(PHASE_TEARDOWN, e.phase) 00301 self.assertEquals('ls -alF b*', str(e)) 00302 self.assertEquals('ls -alF b*', repr(e)) 00303 00304 def test_RosbinExecutable(self): 00305 from roslaunch.core import RosbinExecutable, PHASE_SETUP, PHASE_RUN, PHASE_TEARDOWN 00306 00307 e = RosbinExecutable('ls', ['-alF']) 00308 self.assertEquals('ls', e.command) 00309 self.assertEquals(['-alF'], e.args) 00310 self.assertEquals(PHASE_RUN, e.phase) 00311 self.assertEquals('ros/bin/ls -alF', str(e)) 00312 self.assertEquals('ros/bin/ls -alF', repr(e)) 00313 00314 e = RosbinExecutable('ls', ['-alF', 'b*'], PHASE_TEARDOWN) 00315 self.assertEquals('ls', e.command) 00316 self.assertEquals(['-alF', 'b*'], e.args) 00317 self.assertEquals(PHASE_TEARDOWN, e.phase) 00318 self.assertEquals('ros/bin/ls -alF b*', str(e)) 00319 self.assertEquals('ros/bin/ls -alF b*', repr(e)) 00320 00321 if __name__ == '__main__': 00322 import rostest 00323 rostest.unitrun('test_roslaunch', NAME, TestCore, coverage_packages=['roslaunch.core']) 00324