$search
00001 #!/usr/bin/env python 00002 # Software License Agreement (BSD License) 00003 # 00004 # Copyright (c) 2008, Willow Garage, Inc. 00005 # All rights reserved. 00006 # 00007 # Redistribution and use in source and binary forms, with or without 00008 # modification, are permitted provided that the following conditions 00009 # are met: 00010 # 00011 # * Redistributions of source code must retain the above copyright 00012 # notice, this list of conditions and the following disclaimer. 00013 # * Redistributions in binary form must reproduce the above 00014 # copyright notice, this list of conditions and the following 00015 # disclaimer in the documentation and/or other materials provided 00016 # with the distribution. 00017 # * Neither the name of Willow Garage, Inc. nor the names of its 00018 # contributors may be used to endorse or promote products derived 00019 # from this software without specific prior written permission. 00020 # 00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00032 # POSSIBILITY OF SUCH DAMAGE. 00033 # 00034 # Revision $Id$ 00035 00036 import roslib; roslib.load_manifest('test_roslaunch') 00037 00038 import os, sys, unittest 00039 import time 00040 import thread 00041 00042 import rostest 00043 import roslaunch.server 00044 00045 00046 ## handy class for kill_pmon to check whether it was called 00047 class Marker(object): 00048 def __init__(self): 00049 self.marked = False 00050 def mark(self): 00051 self.marked = True 00052 00053 ## Fake ProcessMonitor object 00054 class ProcessMonitorMock(object): 00055 def __init__(self): 00056 self.core_procs = [] 00057 self.procs = [] 00058 self.listeners = [] 00059 self.alive = False 00060 self.is_shutdown = False 00061 00062 def isAlive(self): 00063 return self.alive 00064 00065 def join(self, *args): 00066 return 00067 00068 def add_process_listener(self, l): 00069 self.listeners.append(l) 00070 00071 def register(self, p): 00072 self.procs.append(p) 00073 00074 def register_core_proc(self, p): 00075 self.core_procs.append(p) 00076 00077 def registrations_complete(self): 00078 pass 00079 00080 def unregister(self, p): 00081 self.procs.remove(p) 00082 00083 def has_process(self, name): 00084 return len([p for p in self.procs if p.name == name]) > 0 00085 00086 def get_process(self, name): 00087 return self.procs.get(p, None) 00088 00089 def has_main_thread_jobs(self): 00090 return False 00091 00092 def do_main_thread_jobs(self): 00093 pass 00094 00095 def kill_process(self, name): 00096 pass 00097 00098 def shutdown(self): 00099 pass 00100 00101 def get_active_names(self): 00102 return [p.name for p in self.procs] 00103 00104 def get_process_names_with_spawn_count(self): 00105 actives = [(p.name, p.spawn_count) for p in self.procs] 00106 deads = [] 00107 retval = [actives, deads] 00108 00109 def mainthread_spin_once(self): 00110 pass 00111 00112 def mainthread_spin(self): 00113 pass 00114 00115 def run(self): 00116 pass 00117 00118 class ProcessMock(roslaunch.pmon.Process): 00119 def __init__(self, package, name, args, env, respawn=False): 00120 super(ProcessMock, self).__init__(package, name, args, env, respawn) 00121 self.stopped = False 00122 def stop(self, errors): 00123 self.stopped = True 00124 00125 class RespawnOnceProcessMock(ProcessMock): 00126 def __init__(self, package, name, args, env, respawn=False): 00127 super(ProcessMock, self).__init__(package, name, args, env, respawn) 00128 self.spawn_count = 0 00129 00130 def is_alive(self): 00131 return False 00132 00133 def start(self): 00134 self.spawn_count += 1 00135 if self.spawn_count > 1: 00136 self.respawn = False 00137 00138 ## Test roslaunch.server 00139 class TestRoslaunchPmon(unittest.TestCase): 00140 00141 def setUp(self): 00142 self.pmon = roslaunch.pmon.ProcessMonitor() 00143 00144 ## test all apis of Process instance. part coverage/sanity test 00145 def _test_Process(self, p, package, name, args, env, respawn): 00146 self.assertEquals(package, p.package) 00147 self.assertEquals(name, p.name) 00148 self.assertEquals(args, p.args) 00149 self.assertEquals(env, p.env) 00150 self.assertEquals(respawn, p.respawn) 00151 self.assertEquals(0, p.spawn_count) 00152 self.assertEquals(None, p.exit_code) 00153 self.assert_(p.get_exit_description()) 00154 self.failIf(p.is_alive()) 00155 00156 info = p.get_info() 00157 self.assertEquals(package, info['package']) 00158 self.assertEquals(name, info['name']) 00159 self.assertEquals(args, info['args']) 00160 self.assertEquals(env, info['env']) 00161 self.assertEquals(respawn, info['respawn']) 00162 self.assertEquals(0, info['spawn_count']) 00163 self.failIf('exit_code' in info) 00164 00165 p.start() 00166 self.assertEquals(1, p.spawn_count) 00167 self.assertEquals(1, p.get_info()['spawn_count']) 00168 p.start() 00169 self.assertEquals(2, p.spawn_count) 00170 self.assertEquals(2, p.get_info()['spawn_count']) 00171 00172 # noop 00173 p.stop() 00174 00175 p.exit_code = 0 00176 self.assertEquals(0, p.get_info()['exit_code']) 00177 self.assert_('cleanly' in p.get_exit_description()) 00178 p.exit_code = 1 00179 self.assertEquals(1, p.get_info()['exit_code']) 00180 self.assert_('[exit code 1]' in p.get_exit_description()) 00181 00182 ## tests to make sure that our Process base class has 100% coverage 00183 def test_Process(self): 00184 from roslaunch.pmon import Process 00185 # test constructor params 00186 respawn = False 00187 package = 'foo-%s'%time.time() 00188 name = 'name-%s'%time.time() 00189 args = [time.time(), time.time(), time.time()] 00190 env = { 'key': time.time(), 'key2': time.time() } 00191 00192 p = Process(package, name, args, env) 00193 self._test_Process(p, package, name, args, env, False) 00194 p = Process(package, name, args, env, True) 00195 self._test_Process(p, package, name, args, env, True) 00196 p = Process(package, name, args, env, False) 00197 self._test_Process(p, package, name, args, env, False) 00198 00199 def _test_DeadProcess(self, p0, package, name, args, env, respawn): 00200 from roslaunch.pmon import DeadProcess 00201 p0.exit_code = -1 00202 dp = DeadProcess(p0) 00203 self.assertEquals(package, dp.package) 00204 self.assertEquals(name, dp.name) 00205 self.assertEquals(args, dp.args) 00206 self.assertEquals(env, dp.env) 00207 self.assertEquals(respawn, dp.respawn) 00208 self.assertEquals(0, dp.spawn_count) 00209 self.assertEquals(-1, dp.exit_code) 00210 self.failIf(dp.is_alive()) 00211 00212 info = dp.get_info() 00213 info0 = p0.get_info() 00214 self.assertEquals(info0['package'], info['package']) 00215 self.assertEquals(info0['name'], info['name']) 00216 self.assertEquals(info0['args'], info['args']) 00217 self.assertEquals(info0['env'], info['env']) 00218 self.assertEquals(info0['respawn'], info['respawn']) 00219 self.assertEquals(0, info['spawn_count']) 00220 00221 try: 00222 dp.start() 00223 self.fail("should not be able to start a dead process") 00224 except: pass 00225 00226 # info should be frozen 00227 p0.package = 'dead package' 00228 p0.name = 'dead name' 00229 p0.spawn_count = 1 00230 self.assertEquals(package, dp.package) 00231 self.assertEquals(name, dp.name) 00232 self.assertEquals(0, dp.spawn_count) 00233 self.assertEquals(package, dp.get_info()['package']) 00234 self.assertEquals(name, dp.get_info()['name']) 00235 self.assertEquals(0, dp.get_info()['spawn_count']) 00236 p0.start() 00237 self.assertEquals(0, dp.spawn_count) 00238 self.assertEquals(0, dp.get_info()['spawn_count']) 00239 00240 # noop 00241 p0.stop() 00242 00243 00244 def test_DeadProcess(self): 00245 from roslaunch.pmon import Process, DeadProcess 00246 # test constructor params 00247 respawn = False 00248 package = 'foo-%s'%time.time() 00249 name = 'name-%s'%time.time() 00250 args = [time.time(), time.time(), time.time()] 00251 env = { 'key': time.time(), 'key2': time.time() } 00252 00253 p = Process(package, name, args, env) 00254 self._test_DeadProcess(p, package, name, args, env, False) 00255 p = Process(package, name, args, env, True) 00256 self._test_DeadProcess(p, package, name, args, env, True) 00257 p = Process(package, name, args, env, False) 00258 self._test_DeadProcess(p, package, name, args, env, False) 00259 00260 def test_start_shutdown_process_monitor(self): 00261 def failer(): 00262 raise Exception("oops") 00263 # noop 00264 self.failIf(roslaunch.pmon.shutdown_process_monitor(None)) 00265 00266 # test with fake pmon so we can get branch-complete 00267 pmon = ProcessMonitorMock() 00268 # - by setting alive to true, shutdown fails, though it can't really do anything about it 00269 pmon.alive = True 00270 self.failIf(roslaunch.pmon.shutdown_process_monitor(pmon)) 00271 00272 # make sure that exceptions get trapped 00273 pmon.shutdown = failer 00274 # should cause an exception during execution, but should get caught 00275 self.failIf(roslaunch.pmon.shutdown_process_monitor(pmon)) 00276 00277 # Test with a real process monitor 00278 pmon = roslaunch.pmon.start_process_monitor() 00279 self.assert_(pmon.isAlive()) 00280 self.assert_(roslaunch.pmon.shutdown_process_monitor(pmon)) 00281 self.failIf(pmon.isAlive()) 00282 00283 # fiddle around with some state that would shouldn't be 00284 roslaunch.pmon._shutting_down = True 00285 pmon = roslaunch.pmon.start_process_monitor() 00286 if pmon != None: 00287 self.failIf(roslaunch.pmon.shutdown_process_monitor(pmon)) 00288 self.fail("start_process_monitor should fail if during shutdown sequence") 00289 00290 def test_pmon_shutdown(self): 00291 # should be noop 00292 roslaunch.pmon.pmon_shutdown() 00293 00294 # start two real process monitors and kill them 00295 # pmon_shutdown 00296 pmon1 = roslaunch.pmon.start_process_monitor() 00297 pmon2 = roslaunch.pmon.start_process_monitor() 00298 self.assert_(pmon1.isAlive()) 00299 self.assert_(pmon2.isAlive()) 00300 00301 roslaunch.pmon.pmon_shutdown() 00302 00303 self.failIf(pmon1.isAlive()) 00304 self.failIf(pmon2.isAlive()) 00305 00306 def test_add_process_listener(self): 00307 # coverage test, not a functionality test as that would be much more difficult to simulate 00308 from roslaunch.pmon import ProcessListener 00309 l = ProcessListener() 00310 self.pmon.add_process_listener(l) 00311 00312 def test_kill_process(self): 00313 from roslaunch.core import RLException 00314 pmon = self.pmon 00315 00316 # should return False 00317 self.failIf(pmon.kill_process('foo')) 00318 00319 p1 = ProcessMock('foo', 'name1', [], {}) 00320 p2 = ProcessMock('bar', 'name2', [], {}) 00321 pmon.register(p1) 00322 pmon.register(p2) 00323 self.failIf(p1.stopped) 00324 self.failIf(p2.stopped) 00325 self.assert_(p1.name in pmon.get_active_names()) 00326 self.assert_(p2.name in pmon.get_active_names()) 00327 00328 # should fail as pmon API is string-based 00329 try: 00330 self.assert_(pmon.kill_process(p1)) 00331 self.fail("kill_process should have thrown RLException") 00332 except RLException: pass 00333 00334 self.assert_(pmon.kill_process(p1.name)) 00335 self.assert_(p1.stopped) 00336 00337 # - pmon should not have removed yet as the pmon thread cannot catch the death 00338 self.assert_(pmon.has_process(p1.name)) 00339 self.assert_(p1.name in pmon.get_active_names()) 00340 00341 self.failIf(p2.stopped) 00342 self.assert_(p2.name in pmon.get_active_names()) 00343 pmon.kill_process(p2.name) 00344 self.assert_(p2.stopped) 00345 00346 # - pmon should not have removed yet as the pmon thread cannot catch the death 00347 self.assert_(pmon.has_process(p2.name)) 00348 self.assert_(p2.name in pmon.get_active_names()) 00349 00350 p3 = ProcessMock('bar', 'name3', [], {}) 00351 def bad(x): 00352 raise Exception("ha ha ha") 00353 p3.stop = bad 00354 pmon.register(p3) 00355 # make sure kill_process is safe 00356 pmon.kill_process(p3.name) 00357 def f(): 00358 return False 00359 00360 p1.is_alive = f 00361 p2.is_alive = f 00362 p3.is_alive = f 00363 00364 # Now that we've 'killed' all the processes, we should be able 00365 # to run through the ProcessMonitor run loop and it should 00366 # exit. But first, important that we check that pmon has no 00367 # other extra state in it 00368 self.assertEquals(3, len(pmon.get_active_names())) 00369 00370 # put pmon into exitable state 00371 pmon.registrations_complete() 00372 00373 # and run it -- but setup a safety timer to kill it if it doesn't exit 00374 marker = Marker() 00375 thread.start_new_thread(kill_pmon, (self.pmon,marker, 10.)) 00376 00377 pmon.run() 00378 00379 self.failIf(marker.marked, "pmon had to be externally killed") 00380 00381 00382 self.assert_(pmon.done) 00383 self.failIf(pmon.has_process(p1.name)) 00384 self.failIf(pmon.has_process(p2.name)) 00385 alive, dead = pmon.get_process_names_with_spawn_count() 00386 self.failIf(alive) 00387 self.assert_((p1.name, p1.spawn_count) in dead) 00388 self.assert_((p2.name, p2.spawn_count) in dead) 00389 00390 00391 def test_run(self): 00392 # run is really hard to test... 00393 pmon = self.pmon 00394 00395 # put pmon into exitable state 00396 pmon.registrations_complete() 00397 00398 # give the pmon a process that raises an exception when it's 00399 # is_alive is checked. this should be marked as a dead process 00400 p1 = ProcessMock('bar', 'name1', [], {}) 00401 def bad(): 00402 raise Exception('ha ha') 00403 p1.is_alive = bad 00404 pmon.register(p1) 00405 00406 # give pmon a well-behaved but dead process 00407 p2 = ProcessMock('bar', 'name2', [], {}) 00408 def f(): 00409 return False 00410 p2.is_alive = f 00411 pmon.register(p2) 00412 00413 # give pmon a process that wants to respawn once 00414 p3 = RespawnOnceProcessMock('bar', 'name3', [], {}) 00415 pmon.register(p3) 00416 00417 # test assumptions about pmon's internal data structures 00418 # before we begin test 00419 self.assert_(p1 in pmon.procs) 00420 self.assert_(pmon._registrations_complete) 00421 self.failIf(pmon.is_shutdown) 00422 00423 # and run it -- but setup a safety timer to kill it if it doesn't exit 00424 marker = Marker() 00425 thread.start_new_thread(kill_pmon, (self.pmon,marker, 10.)) 00426 00427 pmon.run() 00428 00429 self.failIf(marker.marked, "pmon had to be externally killed") 00430 00431 # retest assumptions 00432 self.failIf(pmon.procs) 00433 self.assert_(pmon.is_shutdown) 00434 00435 pmon.is_shutdown = False 00436 00437 def test_get_process_names_with_spawn_count(self): 00438 p1 = ProcessMock('foo', 'name1', [], {}) 00439 p2 = ProcessMock('bar', 'name2', [], {}) 00440 00441 pmon = self.pmon 00442 self.assertEquals([[], []], pmon.get_process_names_with_spawn_count()) 00443 pmon.register(p1) 00444 self.assertEquals([[('name1', 0),], []], pmon.get_process_names_with_spawn_count()) 00445 pmon.register(p2) 00446 alive, dead = pmon.get_process_names_with_spawn_count() 00447 self.assertEquals([], dead) 00448 self.assert_(('name1', 0) in alive) 00449 self.assert_(('name2', 0) in alive) 00450 00451 import random 00452 p1.spawn_count = random.randint(1, 10000) 00453 p2.spawn_count = random.randint(1, 10000) 00454 00455 alive, dead = pmon.get_process_names_with_spawn_count() 00456 self.assertEquals([], dead) 00457 self.assert_((p1.name, p1.spawn_count) in alive) 00458 self.assert_((p2.name, p2.spawn_count) in alive) 00459 00460 #TODO figure out how to test dead_list 00461 00462 00463 ## Tests ProcessMonitor.register(), unregister(), has_process(), and get_process() 00464 def test_registration(self): 00465 from roslaunch.core import RLException 00466 from roslaunch.pmon import Process 00467 pmon = self.pmon 00468 00469 p1 = Process('foo', 'name1', [], {}) 00470 p2 = Process('bar', 'name2', [], {}) 00471 corep1 = Process('core', 'core1', [], {}) 00472 corep2 = Process('core', 'core2', [], {}) 00473 00474 pmon.register(p1) 00475 self.assert_(pmon.has_process('name1')) 00476 self.assertEquals(p1, pmon.get_process('name1')) 00477 self.failIf(pmon.has_process('name2')) 00478 self.assertEquals(['name1'], pmon.get_active_names()) 00479 try: 00480 pmon.register(Process('foo', p1.name, [], {})) 00481 self.fail("should not allow duplicate process name") 00482 except RLException: pass 00483 00484 pmon.register(p2) 00485 self.assert_(pmon.has_process('name2')) 00486 self.assertEquals(p2, pmon.get_process('name2')) 00487 self.assertEquals(set(['name1', 'name2']), set(pmon.get_active_names())) 00488 00489 pmon.register_core_proc(corep1) 00490 self.assert_(pmon.has_process('core1')) 00491 self.assertEquals(corep1, pmon.get_process('core1')) 00492 self.assertEquals(set(['name1', 'name2', 'core1']), set(pmon.get_active_names())) 00493 00494 pmon.register_core_proc(corep2) 00495 self.assert_(pmon.has_process('core2')) 00496 self.assertEquals(corep2, pmon.get_process('core2')) 00497 self.assertEquals(set(['name1', 'name2', 'core1', 'core2']), set(pmon.get_active_names())) 00498 00499 00500 pmon.unregister(p2) 00501 self.failIf(pmon.has_process('name2')) 00502 pmon.unregister(p1) 00503 self.failIf(pmon.has_process('name1')) 00504 pmon.unregister(corep1) 00505 self.failIf(pmon.has_process('core1')) 00506 pmon.unregister(corep2) 00507 self.failIf(pmon.has_process('core2')) 00508 00509 pmon.shutdown() 00510 try: 00511 pmon.register(Process('shutdown_fail', 'shutdown_fail', [], {})) 00512 self.fail("registration should fail post-shutdown") 00513 except RLException: pass 00514 00515 00516 00517 def test_mainthread_spin_once(self): 00518 # shouldn't do anything 00519 self.pmon.done = False 00520 self.pmon.mainthread_spin_once() 00521 self.pmon.done = True 00522 self.pmon.mainthread_spin_once() 00523 00524 def test_mainthread_spin(self): 00525 # can't test actual spin as that would go forever 00526 self.pmon.done = False 00527 thread.start_new_thread(kill_pmon, (self.pmon,Marker())) 00528 self.pmon.mainthread_spin() 00529 00530 def kill_pmon(pmon, marker, delay=1.0): 00531 # delay execution so that whatever pmon method we're calling has time to enter 00532 import time 00533 time.sleep(delay) 00534 if not pmon.is_shutdown: 00535 marker.mark() 00536 print "stopping pmon" 00537 # pmon has two states that need to be set, as many of the tests do not start the actual process monitor 00538 pmon.shutdown() 00539 pmon.done = True 00540 00541 if __name__ == '__main__': 00542 rostest.unitrun('test_roslaunch', sys.argv[0], TestRoslaunchPmon, coverage_packages=['roslaunch.pmon']) 00543