00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 import roslib; roslib.load_manifest('test_roslaunch')
00037
00038 import os, sys, unittest
00039 import time
00040
00041 import rostest
00042 import roslaunch.pmon
00043 import roslaunch.server
00044
00045 import roslib.xmlrpc
00046 from xmlrpclib import ServerProxy
00047
00048 from roslaunch.server import ProcessListener
00049 class MockProcessListener(ProcessListener):
00050 def process_died(self, name, code):
00051 self.process_name = name
00052 self.exit_code = code
00053
00054
00055 class ProcessMock(roslaunch.pmon.Process):
00056 def __init__(self, package, name, args, env, respawn=False):
00057 super(ProcessMock, self).__init__(package, name, args, env, respawn)
00058 self.stopped = False
00059 def stop(self):
00060 self.stopped = True
00061
00062
00063 class ProcessMonitorMock(object):
00064 def __init__(self):
00065 self.core_procs = []
00066 self.procs = []
00067 self.listeners = []
00068 self.is_shutdown = False
00069
00070 def join(self, timeout=0):
00071 pass
00072
00073 def add_process_listener(self, l):
00074 self.listeners.append(l)
00075
00076 def register(self, p):
00077 self.procs.append(p)
00078
00079 def register_core_proc(self, p):
00080 self.core_procs.append(p)
00081
00082 def registrations_complete(self):
00083 pass
00084
00085 def unregister(self, p):
00086 self.procs.remove(p)
00087
00088 def has_process(self, name):
00089 return len([p for p in self.procs if p.name == name]) > 0
00090
00091 def get_process(self, name):
00092 val = [p for p in self.procs if p.name == name]
00093 if val:
00094 return val[0]
00095 return None
00096
00097 def has_main_thread_jobs(self):
00098 return False
00099
00100 def do_main_thread_jobs(self):
00101 pass
00102
00103 def kill_process(self, name):
00104 pass
00105
00106 def shutdown(self):
00107 self.is_shutdown = True
00108
00109 def get_active_names(self):
00110 return [p.name for p in self.procs]
00111
00112 def get_process_names_with_spawn_count(self):
00113 actives = [(p.name, p.spawn_count) for p in self.procs]
00114 deads = []
00115 return [actives, deads]
00116
00117 def mainthread_spin_once(self):
00118 pass
00119
00120 def mainthread_spin(self):
00121 pass
00122
00123 def run(self):
00124 pass
00125
00126 class TestHandler(roslaunch.server.ROSLaunchBaseHandler):
00127 def __init__(self, pm):
00128 super(TestHandler, self).__init__(pm)
00129 self.pinged = None
00130 def ping(self, val):
00131 self.pinged = val
00132 return True
00133
00134
00135 class TestRoslaunchServer(unittest.TestCase):
00136
00137 def setUp(self):
00138 self.pmon = ProcessMonitorMock()
00139 roslaunch.core.clear_printlog_handlers()
00140 roslaunch.core.clear_printerrlog_handlers()
00141
00142 def test_ChildRoslaunchProcess(self):
00143
00144 from roslaunch.server import ChildROSLaunchProcess
00145 name = 'foo-%s'%time.time()
00146 args = [time.time(), time.time()]
00147 env = { 'key-%s'%time.time() : str(time.time()) }
00148 cp = ChildROSLaunchProcess(name, args, env)
00149 self.assertEquals(name, cp.name)
00150 self.assertEquals(args, cp.args)
00151 self.assertEquals(env, cp.env)
00152 self.assertEquals(None, cp.uri)
00153
00154 uri = 'http://foo:1234'
00155 cp.set_uri(uri)
00156 self.assertEquals(uri, cp.uri)
00157
00158 def _succeed(self, retval):
00159 code, msg, val = retval
00160 self.assertEquals(1, code)
00161 self.assert_(type(msg) == str)
00162 return val
00163
00164 def test_ROSLaunchBaseHandler(self):
00165 from roslaunch.server import ROSLaunchBaseHandler
00166
00167 try:
00168 ROSLaunchBaseHandler(None)
00169 self.fail("should not allow pm as None")
00170 except: pass
00171
00172 pmon = self.pmon
00173 h = ROSLaunchBaseHandler(pmon)
00174 self._test_ROSLaunchBaseHandler(h)
00175
00176
00177 def _test_ROSLaunchBaseHandler(self, h):
00178 pmon = self.pmon
00179
00180 self.assertEquals(os.getpid(), self._succeed(h.get_pid()))
00181
00182
00183 process_list = self._succeed(h.list_processes())
00184 self.assertEquals([[], []], process_list)
00185
00186 p = ProcessMock('pack', 'name', [], {})
00187 p.spawn_count = 1
00188 pmon.register(p)
00189
00190 process_list = self._succeed(h.list_processes())
00191 self.assertEquals([('name', 1),], process_list[0])
00192 self.assertEquals([], process_list[1])
00193
00194 p.spawn_count = 2
00195 process_list = self._succeed(h.list_processes())
00196 self.assertEquals([('name', 2),], process_list[0])
00197 self.assertEquals([], process_list[1])
00198
00199 pmon.unregister(p)
00200
00201
00202 code, msg, val = h.process_info('fubar')
00203 self.assertEquals(-1, code)
00204 self.assert_(type(msg) == str)
00205 self.assertEquals({}, val)
00206
00207 pmon.register(p)
00208 self.assertEquals(p.get_info(), self._succeed(h.process_info(p.name)))
00209 pmon.unregister(p)
00210
00211
00212
00213 h.pm = None
00214 code, msg, val = h.get_node_names()
00215 self.assertEquals(0, code)
00216 self.assert_(type(msg) == str)
00217 self.assertEquals([], val)
00218 h.pm = pmon
00219
00220 self.assertEquals(pmon.get_active_names(), self._succeed(h.get_node_names()))
00221 pmon.register(p)
00222 self.assertEquals(pmon.get_active_names(), self._succeed(h.get_node_names()))
00223 pmon.unregister(p)
00224
00225 def test_ROSLaunchParentHandler(self):
00226 from roslaunch.server import ROSLaunchParentHandler
00227 from roslaunch.server import ChildROSLaunchProcess
00228 try:
00229 ROSLaunchParentHandler(None, {}, [])
00230 self.fail("should not allow pm as None")
00231 except: pass
00232
00233 pmon = self.pmon
00234 child_processes = {}
00235 listeners = []
00236 h = ROSLaunchParentHandler(pmon, child_processes, listeners)
00237 self.assertEquals(child_processes, h.child_processes)
00238 self.assertEquals(listeners, h.listeners)
00239 self._test_ROSLaunchBaseHandler(h)
00240
00241 from rosgraph_msgs.msg import Log
00242 h.log('client-1', Log.FATAL, "message")
00243 h.log('client-1', Log.ERROR, "message")
00244 h.log('client-1', Log.DEBUG, "message")
00245 h.log('client-1', Log.INFO, "started with pid 1234")
00246
00247
00248
00249 self._succeed(h.process_died('dead1', -1))
00250
00251 l = roslaunch.pmon.ProcessListener()
00252 listeners.append(l)
00253 self._succeed(h.process_died('dead2', -1))
00254
00255 def bad():
00256 raise Exception("haha")
00257 l.process_died = bad
00258 self._succeed(h.process_died('dead3', -1))
00259
00260
00261
00262
00263 self.assertEquals([], self._succeed(h.list_children()))
00264
00265
00266 code, msg, val = h.register('client-unknown', 'http://unroutable:1234')
00267 self.assertEquals(-1, code)
00268 self.assertEquals([], self._succeed(h.list_children()))
00269
00270
00271 uri = 'http://unroutable:1324'
00272 child_processes['client-1'] = ChildROSLaunchProcess('foo', [], {})
00273 val = self._succeed(h.register('client-1', uri))
00274 self.assert_(type(val) == int)
00275 self.assertEquals([uri], self._succeed(h.list_children()))
00276
00277 def test_ROSLaunchChildHandler(self):
00278 from roslaunch.server import ROSLaunchChildHandler
00279 pmon = self.pmon
00280 import roslib.params
00281 try:
00282
00283 run_id = roslib.params.get_param('/run_id')
00284 except:
00285 run_id = 'foo-%s'%time.time()
00286 name = 'foo-bob'
00287 server_uri = 'http://localhost:12345'
00288 try:
00289 h = ROSLaunchChildHandler(run_id, name, server_uri, None)
00290 self.fail("should not allow pm as None")
00291 except: pass
00292 try:
00293 h = ROSLaunchChildHandler(run_id, name, 'http://bad_uri:0', pmon)
00294 self.fail("should not allow bad uri")
00295 except: pass
00296 try:
00297 h = ROSLaunchChildHandler(run_id, name, None, pmon)
00298 self.fail("should not allow None server_uri")
00299 except: pass
00300
00301 h = ROSLaunchChildHandler(run_id, name, server_uri, pmon)
00302 self.assertEquals(run_id, h.run_id)
00303 self.assertEquals(name, h.name)
00304 self.assertEquals(server_uri, h.server_uri)
00305 self._test_ROSLaunchBaseHandler(h)
00306
00307
00308 from rosgraph_msgs.msg import Log
00309 h._log(Log.INFO, 'hello')
00310
00311
00312
00313 h.shutdown()
00314 self.assert_(pmon.is_shutdown)
00315
00316 self.assert_(h.pm is None)
00317 code, msg, val = h.launch('<launch></launch>')
00318 self.assertEquals(0, code)
00319
00320
00321
00322 def test__ProcessListenerForwarder(self):
00323 from roslaunch.server import _ProcessListenerForwarder, ProcessListener
00324
00325
00326 f = _ProcessListenerForwarder(None)
00327
00328 f.process_died("foo", -1)
00329
00330 l = MockProcessListener()
00331 f = _ProcessListenerForwarder(l)
00332
00333 f.process_died("foo", -1)
00334
00335 self.assertEquals(l.process_name, 'foo')
00336 self.assertEquals(l.exit_code, -1)
00337
00338 def test_ROSLaunchNode(self):
00339
00340 from roslaunch.server import ROSLaunchNode
00341
00342
00343 handler = TestHandler(self.pmon)
00344 node = ROSLaunchNode(handler)
00345
00346
00347 node.start()
00348 self.assert_(node.uri)
00349
00350
00351 s = ServerProxy(node.uri)
00352 test_val = 'test-%s'%time.time()
00353 s.ping(test_val)
00354 self.assertEquals(handler.pinged, test_val)
00355
00356
00357 code, msg, pid = s.get_pid()
00358 self.assertEquals(1, code)
00359 self.assert_(type(msg) == str)
00360 self.assertEquals(os.getpid(), pid)
00361
00362
00363 node.shutdown('test done')
00364
00365 def test_ROSLaunchParentNode(self):
00366 from roslaunch.server import ROSLaunchParentNode
00367 from roslaunch.server import ChildROSLaunchProcess
00368 from roslaunch.config import ROSLaunchConfig
00369 from roslaunch.pmon import ProcessListener
00370 rosconfig = ROSLaunchConfig()
00371 try:
00372 ROSLaunchParentNode(rosconfig, None)
00373 self.fail("should not allow pm as None")
00374 except: pass
00375 pmon = self.pmon
00376 n = ROSLaunchParentNode(rosconfig, pmon)
00377 self.assertEquals(rosconfig, n.rosconfig)
00378 self.assertEquals([], n.listeners)
00379 self.assertEquals({}, n.child_processes)
00380 self.assertEquals(n.handler.listeners, n.listeners)
00381 self.assertEquals(n.handler.child_processes, n.child_processes)
00382
00383
00384 self.assertEquals(n.handler.listeners, n.listeners)
00385 l = ProcessListener()
00386 n.add_process_listener(l)
00387 self.assertEquals([l], n.listeners)
00388 self.assertEquals(n.handler.listeners, n.listeners)
00389
00390
00391 import roslaunch.config
00392 server = roslaunch.server.ROSLaunchParentNode(roslaunch.config.ROSLaunchConfig(), self.pmon)
00393
00394
00395
00396
00397 roslaunch.core.clear_printlog_handlers()
00398 roslaunch.core.clear_printerrlog_handlers()
00399
00400
00401 child_name = 'child-%s'%time.time()
00402 child_proc = ChildROSLaunchProcess('foo', [], {})
00403 server.add_child(child_name, child_proc)
00404
00405 try:
00406 server.start()
00407 self.assert_(server.uri, "server URI did not initialize")
00408 s = ServerProxy(server.uri)
00409 child_uri = 'http://fake-unroutable:1324'
00410
00411 val = self._succeed(s.list_children())
00412 self.assertEquals([], val)
00413
00414 val = self._succeed(s.register(child_name, child_uri))
00415 self.assertEquals(1, val)
00416
00417 val = self._succeed(s.list_children())
00418 self.assertEquals([child_uri], val)
00419 finally:
00420 server.shutdown('test done')
00421
00422
00423
00424 def test_ROSLaunchChildNode(self):
00425 from roslaunch.server import ROSLaunchChildNode
00426 from roslaunch.server import ChildROSLaunchProcess
00427 pmon = self.pmon
00428 import roslib.params
00429 try:
00430
00431 run_id = roslib.params.get_param('/run_id')
00432 except:
00433 run_id = 'foo-%s'%time.time()
00434 name = 'foo-bob'
00435 server_uri = 'http://localhost:12345'
00436 try:
00437 ROSLaunchChildNode(run_id, name, server_uri, None)
00438 self.fail("should not allow pm as None")
00439 except: pass
00440 try:
00441 ROSLaunchChildNode(run_id, name, 'http://bad_uri:0', pmon)
00442 self.fail("should not allow bad uri")
00443 except: pass
00444
00445 n = ROSLaunchChildNode(run_id, name, server_uri, pmon)
00446 self.assertEquals(run_id, n.run_id)
00447 self.assertEquals(name, n.name)
00448 self.assertEquals(server_uri, n.server_uri)
00449
00450
00451 import roslaunch.config
00452 server = roslaunch.server.ROSLaunchParentNode(roslaunch.config.ROSLaunchConfig(), self.pmon)
00453
00454 child_proc = ChildROSLaunchProcess('foo', [], {})
00455 server.add_child(name, child_proc)
00456
00457 try:
00458 server.start()
00459 self.assert_(server.uri, "server URI did not initialize")
00460 s = ServerProxy(server.uri)
00461
00462 print "SERVER STARTED"
00463
00464
00465 n = ROSLaunchChildNode(run_id, name, server.uri, pmon)
00466 n.start()
00467 print "CHILD STARTED"
00468 self.assert_(n.uri, "child URI did not initialize")
00469
00470
00471 print "VERIFYING REGISTRATION"
00472 self.assertEquals(child_proc.uri, n.uri)
00473 child_uri = 'http://fake-unroutable:1324'
00474
00475 val = self._succeed(s.list_children())
00476 self.assertEquals([n.uri], val)
00477 finally:
00478 print "SHUTTING DOWN"
00479 n.shutdown('test done')
00480 server.shutdown('test done')
00481
00482
00483 if __name__ == '__main__':
00484 rostest.unitrun('test_roslaunch', sys.argv[0], TestRoslaunchServer, coverage_packages=['roslaunch.server'])
00485