00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 PKG = 'test_roslaunch'
00037 NAME = 'test_core'
00038
00039 import roslib; roslib.load_manifest(PKG)
00040
00041 import os
00042 import sys
00043 import unittest
00044
00045 _lastmsg = None
00046 def printlog_cb(msg):
00047 global _lastmsg
00048 _lastmsg = msg
00049 def printlog_cb_exception(msg):
00050 raise Exception(msg)
00051
00052
00053 class TestCore(unittest.TestCase):
00054 def test_xml_escape(self):
00055 from roslaunch.core import _xml_escape
00056 self.assertEquals('', _xml_escape(''))
00057 self.assertEquals(' ', _xml_escape(' '))
00058 self.assertEquals('"', _xml_escape('"'))
00059 self.assertEquals('"hello world"', _xml_escape('"hello world"'))
00060 self.assertEquals('>', _xml_escape('>'))
00061 self.assertEquals('<', _xml_escape('<'))
00062 self.assertEquals('&', _xml_escape('&'))
00063 self.assertEquals('&amp;', _xml_escape('&'))
00064 self.assertEquals('&"><"', _xml_escape('&"><"'))
00065
00066
00067 def test_child_mode(self):
00068 from roslaunch.core import is_child_mode, set_child_mode
00069 self.failIf(is_child_mode())
00070 set_child_mode(True)
00071 self.assert_(is_child_mode())
00072 set_child_mode(False)
00073 self.failIf(is_child_mode())
00074
00075 def test_printlog(self):
00076 from roslaunch.core import add_printlog_handler, add_printerrlog_handler, printlog, printlog_bold, printerrlog
00077 add_printlog_handler(printlog_cb)
00078 add_printlog_handler(printlog_cb_exception)
00079 add_printerrlog_handler(printlog_cb)
00080 add_printerrlog_handler(printlog_cb_exception)
00081
00082
00083 global _lastmsg
00084 _lastmsg = None
00085 printlog('foo')
00086 self.assertEquals('foo', _lastmsg)
00087 printlog_bold('bar')
00088 self.assertEquals('bar', _lastmsg)
00089 printerrlog('baz')
00090 self.assertEquals('baz', _lastmsg)
00091
00092 def test_setup_env(self):
00093 from roslaunch.core import setup_env, Node, Machine
00094 ros_root = '/ros/root1'
00095 rpp = '/rpp1'
00096 master_uri = 'http://masteruri:1234'
00097
00098 n = Node('nodepkg','nodetype')
00099 m = Machine('name1', ros_root, rpp, '1.2.3.4')
00100 d = setup_env(n, m, master_uri)
00101 self.assertEquals(d['ROS_MASTER_URI'], master_uri)
00102 self.assertEquals(d['ROS_ROOT'], ros_root)
00103 self.assertEquals(d['PYTHONPATH'], os.path.join(ros_root, 'core', 'roslib', 'src'))
00104 self.assertEquals(d['ROS_PACKAGE_PATH'], rpp)
00105 for k in ['ROS_IP', 'ROS_NAMESPACE']:
00106 if k in d:
00107 self.fail('%s should not be set: %s'%(k,d[k]))
00108
00109
00110 m = Machine('name1', '', '', '1.2.3.4')
00111 d = setup_env(n, m, master_uri)
00112 val = os.environ['ROS_ROOT']
00113 self.assertEquals(d['ROS_ROOT'], val)
00114 self.assertEquals(d['PYTHONPATH'], os.path.join(val, 'core', 'roslib', 'src'))
00115 self.failIf('ROS_PACKAGE_PATH' in d, 'ROS_PACKAGE_PATH should not be set: %s'%d)
00116
00117
00118 m = Machine('name1', ros_root, rpp, '1.2.3.4', ros_ip="4.5.6.7")
00119 n = Node('nodepkg','nodetype', namespace="/ns1")
00120 d = setup_env(n, m, master_uri)
00121 self.assertEquals(d['ROS_NAMESPACE'], "/ns1")
00122
00123 n = Node('nodepkg','nodetype', namespace="/ns2/")
00124 d = setup_env(n, m, master_uri)
00125 self.assertEquals(d['ROS_NAMESPACE'], "/ns2")
00126
00127
00128 m = Machine('name1', ros_root, rpp, '1.2.3.4', ros_ip="4.5.6.7")
00129 d = setup_env(n, m, master_uri)
00130 self.assertEquals(d['ROS_IP'], "4.5.6.7")
00131
00132
00133 n = Node('nodepkg','nodetype', env_args=[('NENV1', 'val1'), ('NENV2', 'val2'), ('ROS_ROOT', '/new/root')])
00134 d = setup_env(n, m, master_uri)
00135 self.assertEquals(d['ROS_ROOT'], "/new/root")
00136 self.assertEquals(d['NENV1'], "val1")
00137 self.assertEquals(d['NENV2'], "val2")
00138
00139
00140 m = Machine('name1', ros_root, rpp, '1.2.3.4', ros_ip="4.5.6.7", env_args=[('MENV1', 'val1'), ('MENV2', 'val2'), ('ROS_ROOT', '/new/root2')])
00141 n = Node('nodepkg','nodetype')
00142 d = setup_env(n, m, master_uri)
00143 self.assertEquals(d['ROS_ROOT'], "/new/root2")
00144 self.assertEquals(d['MENV1'], "val1")
00145 self.assertEquals(d['MENV2'], "val2")
00146
00147
00148 m = Machine('name1', ros_root, rpp, '1.2.3.4', ros_ip="4.5.6.7", env_args=[('MENV1', 'val1'), ('MENV2', 'val2')])
00149 n = Node('nodepkg','nodetype', env_args=[('MENV1', 'nodeval1')])
00150 d = setup_env(n, m, master_uri)
00151 self.assertEquals(d['MENV1'], "nodeval1")
00152 self.assertEquals(d['MENV2'], "val2")
00153
00154
00155
00156 def test_Machine(self):
00157 from roslaunch.core import Machine
00158 m = Machine('name1', '/ros/root1', '/rpp1', '1.2.3.4')
00159 str(m), m.config_key()
00160 self.assertEquals(m, m)
00161 self.assertEquals(m, Machine('name1', '/ros/root1', '/rpp1', '1.2.3.4'))
00162
00163 self.assert_(m.config_equals(Machine('name1b', '/ros/root1', '/rpp1', '1.2.3.4')))
00164 self.assert_(m.config_equals(Machine('name1c', '/ros/root1', '/rpp1', '1.2.3.4', assignable=False)))
00165 self.failIf(m.config_equals(Machine('name1d', '/ros/root2', '/rpp1', '1.2.3.4')))
00166 self.failIf(m.config_equals(Machine('name1e', '/ros/root1', '/rpp2', '1.2.3.4')))
00167 self.failIf(m.config_equals(Machine('name1f', '/ros/root1', '/rpp1', '2.2.3.4')))
00168
00169 self.assertEquals(m.name, 'name1')
00170 self.assertEquals(m.ros_root, '/ros/root1')
00171 self.assertEquals(m.ros_package_path, '/rpp1')
00172 self.assertEquals(m.address, '1.2.3.4')
00173 self.assertEquals(m.assignable, True)
00174 self.assertEquals(m.ssh_port, 22)
00175 self.assertEquals(m.env_args, [])
00176 for p in ['ros_ip', 'user', 'password']:
00177 self.assertEquals(getattr(m, p), None)
00178
00179 m = Machine('name2', '/ros/root2', '/rpp2', '2.2.3.4', assignable=False)
00180 self.assertEquals(m.assignable, False)
00181 str(m), m.config_key()
00182 self.assertEquals(m, m)
00183 self.assertEquals(m, Machine('name2', '/ros/root2', '/rpp2', '2.2.3.4', assignable=False))
00184 self.assert_(m.config_equals(Machine('name2b', '/ros/root2', '/rpp2', '2.2.3.4', assignable=False)))
00185 self.assert_(m.config_equals(Machine('name2c', '/ros/root2', '/rpp2', '2.2.3.4', assignable=True)))
00186
00187 m = Machine('name3', '/ros/root3', '/rpp3', '3.3.3.4', ros_ip='4.5.6.7')
00188 self.assertEquals(m.ros_ip, '4.5.6.7')
00189 str(m)
00190 self.assertEquals(m, m)
00191 self.assertEquals(m, Machine('name3', '/ros/root3', '/rpp3', '3.3.3.4', ros_ip='4.5.6.7'))
00192 self.assert_(m.config_equals(Machine('name3b', '/ros/root3', '/rpp3', '3.3.3.4', ros_ip='4.5.6.7')))
00193
00194 m = Machine('name4', '/ros/root4', '/rpp4', '4.4.3.4', user='user4')
00195 self.assertEquals(m.user, 'user4')
00196 str(m), m.config_key()
00197 self.assertEquals(m, m)
00198 self.assertEquals(m, Machine('name4', '/ros/root4', '/rpp4', '4.4.3.4', user='user4'))
00199 self.assert_(m.config_equals(Machine('name4b', '/ros/root4', '/rpp4', '4.4.3.4', user='user4')))
00200 self.failIf(m.config_equals(Machine('name4b', '/ros/root4', '/rpp4', '4.4.3.4', user='user4b')))
00201
00202 m = Machine('name5', '/ros/root5', '/rpp5', '5.5.3.4', password='password5')
00203 self.assertEquals(m.password, 'password5')
00204 str(m), m.config_key()
00205 self.assertEquals(m, m)
00206 self.assertEquals(m, Machine('name5', '/ros/root5', '/rpp5', '5.5.3.4', password='password5'))
00207 self.assert_(m.config_equals(Machine('name5b', '/ros/root5', '/rpp5', '5.5.3.4', password='password5')))
00208 self.failIf(m.config_equals(Machine('name5c', '/ros/root5', '/rpp5', '5.5.3.4', password='password5c')))
00209
00210 m = Machine('name6', '/ros/root6', '/rpp6', '6.6.3.4', ssh_port=23)
00211 self.assertEquals(m.ssh_port, 23)
00212 str(m)
00213 m.config_key()
00214 self.assertEquals(m, m)
00215 self.assertEquals(m, Machine('name6', '/ros/root6', '/rpp6', '6.6.3.4', ssh_port=23))
00216 self.assert_(m.config_equals(Machine('name6b', '/ros/root6', '/rpp6', '6.6.3.4', ssh_port=23)))
00217 self.failIf(m.config_equals(Machine('name6c', '/ros/root6', '/rpp6', '6.6.3.4', ssh_port=24)))
00218
00219 m = Machine('name7', '/ros/root7', '/rpp7', '7.7.3.4', env_args=[('ARG', 'val'), ('ARG2', 'val')])
00220 self.assertEquals(m.env_args, [('ARG', 'val'), ('ARG2', 'val')])
00221 str(m), m.config_key()
00222 self.assertEquals(m, m)
00223 self.assertEquals(m, Machine('name7', '/ros/root7', '/rpp7', '7.7.3.4', env_args=[('ARG', 'val'), ('ARG2', 'val')]))
00224 self.assert_(m.config_equals(Machine('name7b', '/ros/root7', '/rpp7', '7.7.3.4', env_args=[('ARG', 'val'), ('ARG2', 'val')])))
00225 self.assert_(m.config_equals(Machine('name7b', '/ros/root7', '/rpp7', '7.7.3.4', env_args=[('ARG2', 'val'), ('ARG', 'val')])))
00226 self.failIf(m.config_equals(Machine('name7c', '/ros/root7', '/rpp7', '7.7.3.4', env_args=[('ARGc', 'valc'), ('ARG2', 'val')])))
00227
00228 def test_local_machine(self):
00229 try:
00230 env_copy = os.environ.copy()
00231 if 'ROS_IP' in os.environ:
00232 del os.environ['ROS_IP']
00233 if 'ROS_HOSTNAME' in os.environ:
00234 del os.environ['ROS_HOSTNAME']
00235
00236 from roslaunch.core import local_machine
00237 lm = local_machine()
00238 self.failIf(lm is None)
00239
00240 self.assert_(lm == local_machine())
00241
00242
00243 self.assertEquals(lm.ros_root, roslib.rosenv.get_ros_root())
00244 self.assertEquals(lm.ros_package_path, roslib.rosenv.get_ros_package_path())
00245
00246
00247 self.assertEquals(lm.ros_ip, None)
00248
00249 self.assertEquals(lm.name, '')
00250 self.assertEquals(lm.assignable, True)
00251 self.assertEquals(lm.env_args, [])
00252 self.assertEquals(lm.user, None)
00253 self.assertEquals(lm.password, None)
00254 finally:
00255 os.environ = env_copy
00256
00257
00258 def test_Master(self):
00259 from roslaunch.core import Master
00260 old_env = os.environ.get('ROS_MASTER_URI', None)
00261 try:
00262 os.environ['ROS_MASTER_URI'] = 'http://foo:789'
00263 m = Master()
00264 self.assertEquals(m.type, Master.ROSMASTER)
00265 self.assertEquals(m.uri, 'http://foo:789')
00266 self.assertEquals(m, m)
00267 self.assertEquals(m, Master())
00268
00269 m = Master(Master.ROSMASTER, 'http://foo:1234')
00270 self.assertEquals(m.type, Master.ROSMASTER)
00271 self.assertEquals(m.uri, 'http://foo:1234')
00272 self.assertEquals(m, m)
00273 self.assertEquals(m, Master(Master.ROSMASTER, 'http://foo:1234'))
00274
00275 import xmlrpclib
00276 self.assert_(isinstance(m.get(), xmlrpclib.ServerProxy))
00277 m.uri = 'http://foo:567'
00278 self.assertEquals(567, m.get_port())
00279 self.failIf(m.is_running())
00280
00281 finally:
00282 if old_env is None:
00283 del os.environ['ROS_MASTER_URI']
00284 else:
00285 os.environ['ROS_MASTER_URI'] = old_env
00286
00287 def test_Executable(self):
00288 from roslaunch.core import Executable, PHASE_SETUP, PHASE_RUN, PHASE_TEARDOWN
00289
00290 e = Executable('ls', ['-alF'])
00291 self.assertEquals('ls', e.command)
00292 self.assertEquals(['-alF'], e.args)
00293 self.assertEquals(PHASE_RUN, e.phase)
00294 self.assertEquals('ls -alF', str(e))
00295 self.assertEquals('ls -alF', repr(e))
00296
00297 e = Executable('ls', ['-alF', 'b*'], PHASE_TEARDOWN)
00298 self.assertEquals('ls', e.command)
00299 self.assertEquals(['-alF', 'b*'], e.args)
00300 self.assertEquals(PHASE_TEARDOWN, e.phase)
00301 self.assertEquals('ls -alF b*', str(e))
00302 self.assertEquals('ls -alF b*', repr(e))
00303
00304 def test_RosbinExecutable(self):
00305 from roslaunch.core import RosbinExecutable, PHASE_SETUP, PHASE_RUN, PHASE_TEARDOWN
00306
00307 e = RosbinExecutable('ls', ['-alF'])
00308 self.assertEquals('ls', e.command)
00309 self.assertEquals(['-alF'], e.args)
00310 self.assertEquals(PHASE_RUN, e.phase)
00311 self.assertEquals('ros/bin/ls -alF', str(e))
00312 self.assertEquals('ros/bin/ls -alF', repr(e))
00313
00314 e = RosbinExecutable('ls', ['-alF', 'b*'], PHASE_TEARDOWN)
00315 self.assertEquals('ls', e.command)
00316 self.assertEquals(['-alF', 'b*'], e.args)
00317 self.assertEquals(PHASE_TEARDOWN, e.phase)
00318 self.assertEquals('ros/bin/ls -alF b*', str(e))
00319 self.assertEquals('ros/bin/ls -alF b*', repr(e))
00320
00321 if __name__ == '__main__':
00322 import rostest
00323 rostest.unitrun('test_roslaunch', NAME, TestCore, coverage_packages=['roslaunch.core'])
00324