$search
00001 #!/usr/bin/env python 00002 # Software License Agreement (BSD License) 00003 # 00004 # Copyright (c) 2008, Willow Garage, Inc. 00005 # All rights reserved. 00006 # 00007 # Redistribution and use in source and binary forms, with or without 00008 # modification, are permitted provided that the following conditions 00009 # are met: 00010 # 00011 # * Redistributions of source code must retain the above copyright 00012 # notice, this list of conditions and the following disclaimer. 00013 # * Redistributions in binary form must reproduce the above 00014 # copyright notice, this list of conditions and the following 00015 # disclaimer in the documentation and/or other materials provided 00016 # with the distribution. 00017 # * Neither the name of Willow Garage, Inc. nor the names of its 00018 # contributors may be used to endorse or promote products derived 00019 # from this software without specific prior written permission. 00020 # 00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00032 # POSSIBILITY OF SUCH DAMAGE. 00033 # 00034 # Revision $Id$ 00035 00036 import roslib; roslib.load_manifest('test_roslaunch') 00037 00038 import os, sys, unittest 00039 import time 00040 00041 import rostest 00042 import roslaunch.child 00043 00044 import roslaunch.server 00045 ## Fake RemoteProcess object 00046 class ChildProcessMock(roslaunch.server.ChildROSLaunchProcess): 00047 def __init__(self, name, args=[], env={}): 00048 super(ChildProcessMock, self).__init__(name, args, env) 00049 self.stopped = False 00050 def stop(self): 00051 self.stopped = True 00052 00053 ## Fake ProcessMonitor object 00054 class ProcessMonitorMock(object): 00055 def __init__(self): 00056 self.core_procs = [] 00057 self.procs = [] 00058 self.listeners = [] 00059 00060 def join(self, timeout=0): 00061 pass 00062 00063 def add_process_listener(self, l): 00064 self.listeners.append(l) 00065 00066 def register(self, p): 00067 self.procs.append(p) 00068 00069 def register_core_proc(self, p): 00070 self.core_procs.append(p) 00071 00072 def registrations_complete(self): 00073 pass 00074 00075 def unregister(self, p): 00076 self.procs.remove(p) 00077 00078 def has_process(self, name): 00079 return len([p for p in self.procs if p.name == name]) > 0 00080 00081 def get_process(self, name): 00082 val = [p for p in self.procs if p.name == name] 00083 if val: 00084 return val[0] 00085 return None 00086 00087 def has_main_thread_jobs(self): 00088 return False 00089 00090 def do_main_thread_jobs(self): 00091 pass 00092 00093 def kill_process(self, name): 00094 pass 00095 00096 def shutdown(self): 00097 pass 00098 00099 def get_active_names(self): 00100 return [p.name for p in self.procs] 00101 00102 def get_process_names_with_spawn_count(self): 00103 actives = [(p.name, p.spawn_count) for p in self.procs] 00104 deads = [] 00105 return [actives, deads] 00106 00107 def mainthread_spin_once(self): 00108 pass 00109 00110 def mainthread_spin(self): 00111 pass 00112 00113 def run(self): 00114 pass 00115 00116 import time 00117 from xmlrpclib import ServerProxy 00118 ## Test roslaunch.server 00119 class TestRoslaunchChild(unittest.TestCase): 00120 00121 def setUp(self): 00122 self.pmon = ProcessMonitorMock() 00123 import roslib.params 00124 try: 00125 # if there is a core up, we have to use its run id 00126 self.run_id = roslib.params.get_param('/run_id') 00127 except: 00128 self.run_id = 'foo-%s'%time.time() 00129 00130 def test_RoslaunchChild(self): 00131 # this is mainly a code coverage test to try and make sure that we don't 00132 # have any uninitialized references, etc... 00133 00134 from roslaunch.child import ROSLaunchChild 00135 00136 name = 'child-%s'%time.time() 00137 server_uri = 'http://unroutable:1234' 00138 c = ROSLaunchChild(self.run_id, name, server_uri) 00139 self.assertEquals(self.run_id, c.run_id) 00140 self.assertEquals(name, c.name) 00141 self.assertEquals(server_uri, c.server_uri) 00142 # - this check tests our assumption about c's process monitor field 00143 self.assertEquals(None, c.pm) 00144 self.assertEquals(None, c.child_server) 00145 00146 # should be a noop 00147 c.shutdown() 00148 00149 # create a new child to test _start_pm() and shutdown() 00150 c = ROSLaunchChild(self.run_id, name, server_uri) 00151 00152 # - test _start_pm and shutdown logic 00153 c._start_pm() 00154 self.assert_(c.pm is not None) 00155 c.shutdown() 00156 00157 # create a new child to test run() with a fake process 00158 # monitor. this requires an actual parent server to be running 00159 00160 import roslaunch.config 00161 server = roslaunch.server.ROSLaunchParentNode(roslaunch.config.ROSLaunchConfig(), self.pmon) 00162 # - register a fake child with the server so that it accepts registration from ROSLaunchChild 00163 server.add_child(name, ChildProcessMock('foo')) 00164 try: 00165 server.start() 00166 self.assert_(server.uri, "server URI did not initialize") 00167 00168 c = ROSLaunchChild(self.run_id, name, server.uri) 00169 c.pm = self.pmon 00170 # - run should speed through 00171 c.run() 00172 finally: 00173 server.shutdown('test done') 00174 00175 # one final test for code completness: raise an exception during run() 00176 c = ROSLaunchChild(self.run_id, name, server_uri) 00177 def bad(): 00178 raise Exception('haha') 00179 # - violate some encapsulation here just to make sure the exception happens 00180 c._start_pm = bad 00181 try: 00182 # this isn't really a correctness test, this just manually 00183 # tests that the exception is logged 00184 c.run() 00185 except: 00186 pass 00187 00188 00189 def kill_parent(p, delay=1.0): 00190 # delay execution so that whatever pmon method we're calling has time to enter 00191 import time 00192 time.sleep(delay) 00193 print "stopping parent" 00194 p.shutdown() 00195 00196 if __name__ == '__main__': 00197 rostest.unitrun('test_roslaunch', sys.argv[0], TestRoslaunchChild, coverage_packages=['roslaunch.child']) 00198