00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 import roslib; roslib.load_manifest('test_roslaunch')
00037
00038 import os, sys, unittest
00039 import thread
00040
00041 import rostest
00042 import roslaunch.parent
00043
00044
00045
00046 class ProcessMock(roslaunch.pmon.Process):
00047 def __init__(self, package, name, args, env, respawn=False):
00048 super(ProcessMock, self).__init__(package, name, args, env, respawn)
00049 self.stopped = False
00050 def stop(self):
00051 self.stopped = True
00052
00053
00054 class ProcessMonitorMock(object):
00055 def __init__(self):
00056 self.core_procs = []
00057 self.procs = []
00058 self.listeners = []
00059 self.is_shutdown = False
00060
00061 def join(self, timeout=0):
00062 pass
00063
00064 def add_process_listener(self, l):
00065 self.listeners.append(l)
00066
00067 def register(self, p):
00068 self.procs.append(p)
00069
00070 def register_core_proc(self, p):
00071 self.core_procs.append(p)
00072
00073 def registrations_complete(self):
00074 pass
00075
00076 def unregister(self, p):
00077 self.procs.remove(p)
00078
00079 def has_process(self, name):
00080 return len([p for p in self.procs if p.name == name]) > 0
00081
00082 def get_process(self, name):
00083 val = [p for p in self.procs if p.name == name]
00084 if val:
00085 return val[0]
00086 return None
00087
00088 def has_main_thread_jobs(self):
00089 return False
00090
00091 def do_main_thread_jobs(self):
00092 pass
00093
00094 def kill_process(self, name):
00095 pass
00096
00097 def shutdown(self):
00098 self.is_shutdown = True
00099
00100 def get_active_names(self):
00101 return [p.name for p in self.procs]
00102
00103 def get_process_names_with_spawn_count(self):
00104 actives = [(p.name, p.spawn_count) for p in self.procs]
00105 deads = []
00106 return [actives, deads]
00107
00108 def mainthread_spin_once(self):
00109 pass
00110
00111 def mainthread_spin(self):
00112 pass
00113
00114 def run(self):
00115 pass
00116
00117 import time
00118 from xmlrpclib import ServerProxy
00119
00120 class TestRoslaunchParent(unittest.TestCase):
00121
00122 def setUp(self):
00123 self.pmon = ProcessMonitorMock()
00124
00125
00126 def test_ROSLaunchParent(self):
00127 try:
00128 self._subtest_ROSLaunchParent()
00129 finally:
00130 self.pmon.shutdown()
00131
00132 def _subtest_ROSLaunchParent(self):
00133 from roslaunch.parent import ROSLaunchParent
00134 pmon = self.pmon
00135 import roslib.params
00136 try:
00137
00138 run_id = roslib.params.get_param('/run_id')
00139 except:
00140 run_id = 'test-rl-parent-%s'%time.time()
00141 name = 'foo-bob'
00142 server_uri = 'http://localhost:12345'
00143
00144 p = ROSLaunchParent(run_id, [], is_core = True, port=None, local_only=False)
00145 self.assertEquals(run_id, p.run_id)
00146 self.assertEquals(True, p.is_core)
00147 self.assertEquals(False, p.local_only)
00148
00149 import roslib.packages
00150 rl_dir = roslib.packages.get_pkg_dir('roslaunch')
00151 rl_file = os.path.join(rl_dir, 'example.launch')
00152 self.assert_(os.path.isfile(rl_file))
00153
00154
00155 p = ROSLaunchParent(run_id, [rl_file], is_core = False, port=None, local_only=True)
00156 self.assertEquals(run_id, p.run_id)
00157 self.assertEquals(False, p.is_core)
00158 self.assertEquals(True, p.local_only)
00159
00160 self.assert_(p.config is None)
00161 p._load_config()
00162 self.assert_(p.config is not None)
00163 self.assert_(p.config.nodes)
00164
00165
00166 p = ROSLaunchParent(run_id, [rl_file], is_core = False, port=11312, local_only=True)
00167 self.assertEquals(11312, p.port)
00168 self.assert_(p.config is None)
00169 p._load_config()
00170
00171 _, port = roslib.network.parse_http_host_and_port(p.config.master.uri)
00172 self.assertEquals(11312, port)
00173
00174
00175 p = ROSLaunchParent(run_id, ['non-existent-fake.launch'])
00176 self.assert_(p.config is None)
00177 try:
00178 p._load_config()
00179 self.fail("load config should have failed due to bad rl file")
00180 except roslaunch.core.RLException: pass
00181
00182
00183 rl_dir = roslib.packages.get_pkg_dir('test_roslaunch')
00184 rl_file = os.path.join(rl_dir, 'test', 'xml', 'test-params-invalid-1.xml')
00185 self.assert_(os.path.isfile(rl_file))
00186 p = ROSLaunchParent(run_id, [rl_file])
00187 self.assert_(p.config is None)
00188 try:
00189 p._load_config()
00190 self.fail("load config should have failed due to bad rl file")
00191 except roslaunch.core.RLException: pass
00192
00193
00194 if 0:
00195 p = ROSLaunchParent(run_id, [], is_core = False, port=None, local_only=True)
00196 self.assertEquals(run_id, p.run_id)
00197 self.assertEquals(False, p.is_core)
00198 self.assertEquals(True, p.local_only)
00199
00200 thread.start_new_thread(kill_parent, (p,))
00201 p.start()
00202 p.spin()
00203
00204
00205 p = ROSLaunchParent(run_id, [], is_core = False, port=None, local_only=True)
00206
00207 for m in ['_init_runner', '_init_remote', '_start_server']:
00208 try:
00209 getattr(p, m)()
00210 self.fail('should have raised')
00211 except roslaunch.core.RLException: pass
00212
00213
00214 p.config = roslaunch.config.ROSLaunchConfig()
00215
00216
00217 for m in ['_init_runner', '_init_remote', '_start_server']:
00218 try:
00219 getattr(p, m)()
00220 self.fail('should have raised')
00221 except roslaunch.core.RLException: pass
00222
00223
00224 p.pm = pmon
00225
00226 for m in ['_init_runner', '_init_remote']:
00227 try:
00228 getattr(p, m)()
00229 self.fail('should have raised')
00230 except roslaunch.core.RLException: pass
00231
00232 from roslaunch.server import ROSLaunchParentNode
00233 p.server = ROSLaunchParentNode(p.config, pmon)
00234 p._init_runner()
00235
00236 self.assert_(p.runner is not None)
00237
00238
00239 p.local_only = True
00240 p._init_remote()
00241 p.local_only = False
00242
00243 def ftrue():
00244 return True
00245 p.config.has_remote_nodes = ftrue
00246 p._init_remote()
00247 self.assert_(p.remote_runner is not None)
00248
00249 self.failIf(pmon.is_shutdown)
00250 p.shutdown()
00251 self.assert_(pmon.is_shutdown)
00252
00253 def kill_parent(p, delay=1.0):
00254
00255 import time
00256 time.sleep(delay)
00257 print "stopping parent"
00258 p.shutdown()
00259
00260 if __name__ == '__main__':
00261 rostest.unitrun('test_roslaunch', sys.argv[0], TestRoslaunchParent, coverage_packages=['roslaunch.parent'])
00262