$search
00001 #!/usr/bin/env python 00002 # Software License Agreement (BSD License) 00003 # 00004 # Copyright (c) 2008, Willow Garage, Inc. 00005 # All rights reserved. 00006 # 00007 # Redistribution and use in source and binary forms, with or without 00008 # modification, are permitted provided that the following conditions 00009 # are met: 00010 # 00011 # * Redistributions of source code must retain the above copyright 00012 # notice, this list of conditions and the following disclaimer. 00013 # * Redistributions in binary form must reproduce the above 00014 # copyright notice, this list of conditions and the following 00015 # disclaimer in the documentation and/or other materials provided 00016 # with the distribution. 00017 # * Neither the name of Willow Garage, Inc. nor the names of its 00018 # contributors may be used to endorse or promote products derived 00019 # from this software without specific prior written permission. 00020 # 00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00032 # POSSIBILITY OF SUCH DAMAGE. 00033 # 00034 # Revision $Id$ 00035 00036 import roslib; roslib.load_manifest('test_roslaunch') 00037 00038 import os, sys, unittest 00039 import time 00040 00041 import rostest 00042 import roslaunch.pmon 00043 import roslaunch.server 00044 00045 import roslib.xmlrpc 00046 from xmlrpclib import ServerProxy 00047 00048 from roslaunch.server import ProcessListener 00049 class MockProcessListener(ProcessListener): 00050 def process_died(self, name, code): 00051 self.process_name = name 00052 self.exit_code = code 00053 00054 ## Fake Process object 00055 class ProcessMock(roslaunch.pmon.Process): 00056 def __init__(self, package, name, args, env, respawn=False): 00057 super(ProcessMock, self).__init__(package, name, args, env, respawn) 00058 self.stopped = False 00059 def stop(self): 00060 self.stopped = True 00061 00062 ## Fake ProcessMonitor object 00063 class ProcessMonitorMock(object): 00064 def __init__(self): 00065 self.core_procs = [] 00066 self.procs = [] 00067 self.listeners = [] 00068 self.is_shutdown = False 00069 00070 def join(self, timeout=0): 00071 pass 00072 00073 def add_process_listener(self, l): 00074 self.listeners.append(l) 00075 00076 def register(self, p): 00077 self.procs.append(p) 00078 00079 def register_core_proc(self, p): 00080 self.core_procs.append(p) 00081 00082 def registrations_complete(self): 00083 pass 00084 00085 def unregister(self, p): 00086 self.procs.remove(p) 00087 00088 def has_process(self, name): 00089 return len([p for p in self.procs if p.name == name]) > 0 00090 00091 def get_process(self, name): 00092 val = [p for p in self.procs if p.name == name] 00093 if val: 00094 return val[0] 00095 return None 00096 00097 def has_main_thread_jobs(self): 00098 return False 00099 00100 def do_main_thread_jobs(self): 00101 pass 00102 00103 def kill_process(self, name): 00104 pass 00105 00106 def shutdown(self): 00107 self.is_shutdown = True 00108 00109 def get_active_names(self): 00110 return [p.name for p in self.procs] 00111 00112 def get_process_names_with_spawn_count(self): 00113 actives = [(p.name, p.spawn_count) for p in self.procs] 00114 deads = [] 00115 return [actives, deads] 00116 00117 def mainthread_spin_once(self): 00118 pass 00119 00120 def mainthread_spin(self): 00121 pass 00122 00123 def run(self): 00124 pass 00125 00126 class TestHandler(roslaunch.server.ROSLaunchBaseHandler): 00127 def __init__(self, pm): 00128 super(TestHandler, self).__init__(pm) 00129 self.pinged = None 00130 def ping(self, val): 00131 self.pinged = val 00132 return True 00133 00134 ## Test roslaunch.server 00135 class TestRoslaunchServer(unittest.TestCase): 00136 00137 def setUp(self): 00138 self.pmon = ProcessMonitorMock() 00139 roslaunch.core.clear_printlog_handlers() 00140 roslaunch.core.clear_printerrlog_handlers() 00141 00142 def test_ChildRoslaunchProcess(self): 00143 # test that constructor is wired up correctly 00144 from roslaunch.server import ChildROSLaunchProcess 00145 name = 'foo-%s'%time.time() 00146 args = [time.time(), time.time()] 00147 env = { 'key-%s'%time.time() : str(time.time()) } 00148 cp = ChildROSLaunchProcess(name, args, env) 00149 self.assertEquals(name, cp.name) 00150 self.assertEquals(args, cp.args) 00151 self.assertEquals(env, cp.env) 00152 self.assertEquals(None, cp.uri) 00153 00154 uri = 'http://foo:1234' 00155 cp.set_uri(uri) 00156 self.assertEquals(uri, cp.uri) 00157 00158 def _succeed(self, retval): 00159 code, msg, val = retval 00160 self.assertEquals(1, code) 00161 self.assert_(type(msg) == str) 00162 return val 00163 00164 def test_ROSLaunchBaseHandler(self): 00165 from roslaunch.server import ROSLaunchBaseHandler 00166 00167 try: 00168 ROSLaunchBaseHandler(None) 00169 self.fail("should not allow pm as None") 00170 except: pass 00171 00172 pmon = self.pmon 00173 h = ROSLaunchBaseHandler(pmon) 00174 self._test_ROSLaunchBaseHandler(h) 00175 00176 # reusable parent class test 00177 def _test_ROSLaunchBaseHandler(self, h): 00178 pmon = self.pmon 00179 # - test get pid 00180 self.assertEquals(os.getpid(), self._succeed(h.get_pid())) 00181 00182 # - test list processes 00183 process_list = self._succeed(h.list_processes()) 00184 self.assertEquals([[], []], process_list) 00185 00186 p = ProcessMock('pack', 'name', [], {}) 00187 p.spawn_count = 1 00188 pmon.register(p) 00189 00190 process_list = self._succeed(h.list_processes()) 00191 self.assertEquals([('name', 1),], process_list[0]) 00192 self.assertEquals([], process_list[1]) 00193 00194 p.spawn_count = 2 00195 process_list = self._succeed(h.list_processes()) 00196 self.assertEquals([('name', 2),], process_list[0]) 00197 self.assertEquals([], process_list[1]) 00198 # - cleanup our work 00199 pmon.unregister(p) 00200 00201 # - test process_info 00202 code, msg, val = h.process_info('fubar') 00203 self.assertEquals(-1, code) 00204 self.assert_(type(msg) == str) 00205 self.assertEquals({}, val) 00206 00207 pmon.register(p) 00208 self.assertEquals(p.get_info(), self._succeed(h.process_info(p.name))) 00209 pmon.unregister(p) 00210 00211 # - test get_node_names 00212 # - reach into instance to get branch-complete 00213 h.pm = None 00214 code, msg, val = h.get_node_names() 00215 self.assertEquals(0, code) 00216 self.assert_(type(msg) == str) 00217 self.assertEquals([], val) 00218 h.pm = pmon 00219 00220 self.assertEquals(pmon.get_active_names(), self._succeed(h.get_node_names())) 00221 pmon.register(p) 00222 self.assertEquals(pmon.get_active_names(), self._succeed(h.get_node_names())) 00223 pmon.unregister(p) 00224 00225 def test_ROSLaunchParentHandler(self): 00226 from roslaunch.server import ROSLaunchParentHandler 00227 from roslaunch.server import ChildROSLaunchProcess 00228 try: 00229 ROSLaunchParentHandler(None, {}, []) 00230 self.fail("should not allow pm as None") 00231 except: pass 00232 00233 pmon = self.pmon 00234 child_processes = {} 00235 listeners = [] 00236 h = ROSLaunchParentHandler(pmon, child_processes, listeners) 00237 self.assertEquals(child_processes, h.child_processes) 00238 self.assertEquals(listeners, h.listeners) 00239 self._test_ROSLaunchBaseHandler(h) 00240 00241 from rosgraph_msgs.msg import Log 00242 h.log('client-1', Log.FATAL, "message") 00243 h.log('client-1', Log.ERROR, "message") 00244 h.log('client-1', Log.DEBUG, "message") 00245 h.log('client-1', Log.INFO, "started with pid 1234") 00246 00247 # test process_died 00248 # - no listeners 00249 self._succeed(h.process_died('dead1', -1)) 00250 # - well-behaved listener 00251 l = roslaunch.pmon.ProcessListener() 00252 listeners.append(l) 00253 self._succeed(h.process_died('dead2', -1)) 00254 # - bad listener with exception 00255 def bad(): 00256 raise Exception("haha") 00257 l.process_died = bad 00258 self._succeed(h.process_died('dead3', -1)) 00259 00260 # test register 00261 00262 # - verify clean slate with list_children 00263 self.assertEquals([], self._succeed(h.list_children())) 00264 00265 # - first register with unknown 00266 code, msg, val = h.register('client-unknown', 'http://unroutable:1234') 00267 self.assertEquals(-1, code) 00268 self.assertEquals([], self._succeed(h.list_children())) 00269 00270 # - now register with known 00271 uri = 'http://unroutable:1324' 00272 child_processes['client-1'] = ChildROSLaunchProcess('foo', [], {}) 00273 val = self._succeed(h.register('client-1', uri)) 00274 self.assert_(type(val) == int) 00275 self.assertEquals([uri], self._succeed(h.list_children())) 00276 00277 def test_ROSLaunchChildHandler(self): 00278 from roslaunch.server import ROSLaunchChildHandler 00279 pmon = self.pmon 00280 import roslib.params 00281 try: 00282 # if there is a core up, we have to use its run id 00283 run_id = roslib.params.get_param('/run_id') 00284 except: 00285 run_id = 'foo-%s'%time.time() 00286 name = 'foo-bob' 00287 server_uri = 'http://localhost:12345' 00288 try: 00289 h = ROSLaunchChildHandler(run_id, name, server_uri, None) 00290 self.fail("should not allow pm as None") 00291 except: pass 00292 try: 00293 h = ROSLaunchChildHandler(run_id, name, 'http://bad_uri:0', pmon) 00294 self.fail("should not allow bad uri") 00295 except: pass 00296 try: 00297 h = ROSLaunchChildHandler(run_id, name, None, pmon) 00298 self.fail("should not allow None server_uri") 00299 except: pass 00300 00301 h = ROSLaunchChildHandler(run_id, name, server_uri, pmon) 00302 self.assertEquals(run_id, h.run_id) 00303 self.assertEquals(name, h.name) 00304 self.assertEquals(server_uri, h.server_uri) 00305 self._test_ROSLaunchBaseHandler(h) 00306 00307 # test _log() 00308 from rosgraph_msgs.msg import Log 00309 h._log(Log.INFO, 'hello') 00310 00311 # test shutdown() 00312 # should uninitialize pm 00313 h.shutdown() 00314 self.assert_(pmon.is_shutdown) 00315 # - this check is mostly to make sure that the launch() call below will exit 00316 self.assert_(h.pm is None) 00317 code, msg, val = h.launch('<launch></launch>') 00318 self.assertEquals(0, code) 00319 00320 # TODO: actual launch. more difficult as we need a core 00321 00322 def test__ProcessListenerForwarder(self): 00323 from roslaunch.server import _ProcessListenerForwarder, ProcessListener 00324 00325 # test with bad logic first 00326 f = _ProcessListenerForwarder(None) 00327 # - should trap exceptions 00328 f.process_died("foo", -1) 00329 # test with actual listener 00330 l = MockProcessListener() 00331 f = _ProcessListenerForwarder(l) 00332 # - should run normally 00333 f.process_died("foo", -1) 00334 00335 self.assertEquals(l.process_name, 'foo') 00336 self.assertEquals(l.exit_code, -1) 00337 00338 def test_ROSLaunchNode(self): 00339 #exercise the basic ROSLaunchNode API 00340 from roslaunch.server import ROSLaunchNode 00341 00342 # - create a node with a handler 00343 handler = TestHandler(self.pmon) 00344 node = ROSLaunchNode(handler) 00345 00346 # - start the node 00347 node.start() 00348 self.assert_(node.uri) 00349 00350 # - call the ping API that we added 00351 s = ServerProxy(node.uri) 00352 test_val = 'test-%s'%time.time() 00353 s.ping(test_val) 00354 self.assertEquals(handler.pinged, test_val) 00355 00356 # - call the pid API 00357 code, msg, pid = s.get_pid() 00358 self.assertEquals(1, code) 00359 self.assert_(type(msg) == str) 00360 self.assertEquals(os.getpid(), pid) 00361 00362 # - shut it down 00363 node.shutdown('test done') 00364 00365 def test_ROSLaunchParentNode(self): 00366 from roslaunch.server import ROSLaunchParentNode 00367 from roslaunch.server import ChildROSLaunchProcess 00368 from roslaunch.config import ROSLaunchConfig 00369 from roslaunch.pmon import ProcessListener 00370 rosconfig = ROSLaunchConfig() 00371 try: 00372 ROSLaunchParentNode(rosconfig, None) 00373 self.fail("should not allow pm as None") 00374 except: pass 00375 pmon = self.pmon 00376 n = ROSLaunchParentNode(rosconfig, pmon) 00377 self.assertEquals(rosconfig, n.rosconfig) 00378 self.assertEquals([], n.listeners) 00379 self.assertEquals({}, n.child_processes) 00380 self.assertEquals(n.handler.listeners, n.listeners) 00381 self.assertEquals(n.handler.child_processes, n.child_processes) 00382 00383 # test add listener 00384 self.assertEquals(n.handler.listeners, n.listeners) 00385 l = ProcessListener() 00386 n.add_process_listener(l) 00387 self.assertEquals([l], n.listeners) 00388 self.assertEquals(n.handler.listeners, n.listeners) 00389 00390 # now, lets make some xmlrpc calls against it 00391 import roslaunch.config 00392 server = roslaunch.server.ROSLaunchParentNode(roslaunch.config.ROSLaunchConfig(), self.pmon) 00393 00394 # it's really dangerous for logging when both a parent and a 00395 # child are in the same process, so out of paranoia, clear the 00396 # logging handlers 00397 roslaunch.core.clear_printlog_handlers() 00398 roslaunch.core.clear_printerrlog_handlers() 00399 00400 # - register a fake child with the server so that it accepts registration from ROSLaunchChild 00401 child_name = 'child-%s'%time.time() 00402 child_proc = ChildROSLaunchProcess('foo', [], {}) 00403 server.add_child(child_name, child_proc) 00404 00405 try: 00406 server.start() 00407 self.assert_(server.uri, "server URI did not initialize") 00408 s = ServerProxy(server.uri) 00409 child_uri = 'http://fake-unroutable:1324' 00410 # - list children should be empty 00411 val = self._succeed(s.list_children()) 00412 self.assertEquals([], val) 00413 # - register 00414 val = self._succeed(s.register(child_name, child_uri)) 00415 self.assertEquals(1, val) 00416 # - list children 00417 val = self._succeed(s.list_children()) 00418 self.assertEquals([child_uri], val) 00419 finally: 00420 server.shutdown('test done') 00421 00422 00423 00424 def test_ROSLaunchChildNode(self): 00425 from roslaunch.server import ROSLaunchChildNode 00426 from roslaunch.server import ChildROSLaunchProcess 00427 pmon = self.pmon 00428 import roslib.params 00429 try: 00430 # if there is a core up, we have to use its run id 00431 run_id = roslib.params.get_param('/run_id') 00432 except: 00433 run_id = 'foo-%s'%time.time() 00434 name = 'foo-bob' 00435 server_uri = 'http://localhost:12345' 00436 try: 00437 ROSLaunchChildNode(run_id, name, server_uri, None) 00438 self.fail("should not allow pm as None") 00439 except: pass 00440 try: 00441 ROSLaunchChildNode(run_id, name, 'http://bad_uri:0', pmon) 00442 self.fail("should not allow bad uri") 00443 except: pass 00444 00445 n = ROSLaunchChildNode(run_id, name, server_uri, pmon) 00446 self.assertEquals(run_id, n.run_id) 00447 self.assertEquals(name, n.name) 00448 self.assertEquals(server_uri, n.server_uri) 00449 00450 # tests for actual registration with server 00451 import roslaunch.config 00452 server = roslaunch.server.ROSLaunchParentNode(roslaunch.config.ROSLaunchConfig(), self.pmon) 00453 # - register a fake child with the server so that it accepts registration from ROSLaunchChild 00454 child_proc = ChildROSLaunchProcess('foo', [], {}) 00455 server.add_child(name, child_proc) 00456 00457 try: 00458 server.start() 00459 self.assert_(server.uri, "server URI did not initialize") 00460 s = ServerProxy(server.uri) 00461 00462 print "SERVER STARTED" 00463 00464 # start the child 00465 n = ROSLaunchChildNode(run_id, name, server.uri, pmon) 00466 n.start() 00467 print "CHILD STARTED" 00468 self.assert_(n.uri, "child URI did not initialize") 00469 00470 # verify registration 00471 print "VERIFYING REGISTRATION" 00472 self.assertEquals(child_proc.uri, n.uri) 00473 child_uri = 'http://fake-unroutable:1324' 00474 # - list children 00475 val = self._succeed(s.list_children()) 00476 self.assertEquals([n.uri], val) 00477 finally: 00478 print "SHUTTING DOWN" 00479 n.shutdown('test done') 00480 server.shutdown('test done') 00481 00482 00483 if __name__ == '__main__': 00484 rostest.unitrun('test_roslaunch', sys.argv[0], TestRoslaunchServer, coverage_packages=['roslaunch.server']) 00485