$search
00001 #!/usr/bin/env python 00002 # Software License Agreement (BSD License) 00003 # 00004 # Copyright (c) 2008, Willow Garage, Inc. 00005 # All rights reserved. 00006 # 00007 # Redistribution and use in source and binary forms, with or without 00008 # modification, are permitted provided that the following conditions 00009 # are met: 00010 # 00011 # * Redistributions of source code must retain the above copyright 00012 # notice, this list of conditions and the following disclaimer. 00013 # * Redistributions in binary form must reproduce the above 00014 # copyright notice, this list of conditions and the following 00015 # disclaimer in the documentation and/or other materials provided 00016 # with the distribution. 00017 # * Neither the name of Willow Garage, Inc. nor the names of its 00018 # contributors may be used to endorse or promote products derived 00019 # from this software without specific prior written permission. 00020 # 00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00032 # POSSIBILITY OF SUCH DAMAGE. 00033 # 00034 # Revision $Id$ 00035 00036 import roslib; roslib.load_manifest('test_roslaunch') 00037 00038 import os, sys, unittest 00039 import thread 00040 00041 import rostest 00042 import roslaunch.parent 00043 00044 00045 ## Fake Process object 00046 class ProcessMock(roslaunch.pmon.Process): 00047 def __init__(self, package, name, args, env, respawn=False): 00048 super(ProcessMock, self).__init__(package, name, args, env, respawn) 00049 self.stopped = False 00050 def stop(self): 00051 self.stopped = True 00052 00053 ## Fake ProcessMonitor object 00054 class ProcessMonitorMock(object): 00055 def __init__(self): 00056 self.core_procs = [] 00057 self.procs = [] 00058 self.listeners = [] 00059 self.is_shutdown = False 00060 00061 def join(self, timeout=0): 00062 pass 00063 00064 def add_process_listener(self, l): 00065 self.listeners.append(l) 00066 00067 def register(self, p): 00068 self.procs.append(p) 00069 00070 def register_core_proc(self, p): 00071 self.core_procs.append(p) 00072 00073 def registrations_complete(self): 00074 pass 00075 00076 def unregister(self, p): 00077 self.procs.remove(p) 00078 00079 def has_process(self, name): 00080 return len([p for p in self.procs if p.name == name]) > 0 00081 00082 def get_process(self, name): 00083 val = [p for p in self.procs if p.name == name] 00084 if val: 00085 return val[0] 00086 return None 00087 00088 def has_main_thread_jobs(self): 00089 return False 00090 00091 def do_main_thread_jobs(self): 00092 pass 00093 00094 def kill_process(self, name): 00095 pass 00096 00097 def shutdown(self): 00098 self.is_shutdown = True 00099 00100 def get_active_names(self): 00101 return [p.name for p in self.procs] 00102 00103 def get_process_names_with_spawn_count(self): 00104 actives = [(p.name, p.spawn_count) for p in self.procs] 00105 deads = [] 00106 return [actives, deads] 00107 00108 def mainthread_spin_once(self): 00109 pass 00110 00111 def mainthread_spin(self): 00112 pass 00113 00114 def run(self): 00115 pass 00116 00117 import time 00118 from xmlrpclib import ServerProxy 00119 ## Test roslaunch.server 00120 class TestRoslaunchParent(unittest.TestCase): 00121 00122 def setUp(self): 00123 self.pmon = ProcessMonitorMock() 00124 00125 00126 def test_ROSLaunchParent(self): 00127 try: 00128 self._subtest_ROSLaunchParent() 00129 finally: 00130 self.pmon.shutdown() 00131 00132 def _subtest_ROSLaunchParent(self): 00133 from roslaunch.parent import ROSLaunchParent 00134 pmon = self.pmon 00135 import roslib.params 00136 try: 00137 # if there is a core up, we have to use its run id 00138 run_id = roslib.params.get_param('/run_id') 00139 except: 00140 run_id = 'test-rl-parent-%s'%time.time() 00141 name = 'foo-bob' 00142 server_uri = 'http://localhost:12345' 00143 00144 p = ROSLaunchParent(run_id, [], is_core = True, port=None, local_only=False) 00145 self.assertEquals(run_id, p.run_id) 00146 self.assertEquals(True, p.is_core) 00147 self.assertEquals(False, p.local_only) 00148 00149 import roslib.packages 00150 rl_dir = roslib.packages.get_pkg_dir('roslaunch') 00151 rl_file = os.path.join(rl_dir, 'example.launch') 00152 self.assert_(os.path.isfile(rl_file)) 00153 00154 # validate load_config logic 00155 p = ROSLaunchParent(run_id, [rl_file], is_core = False, port=None, local_only=True) 00156 self.assertEquals(run_id, p.run_id) 00157 self.assertEquals(False, p.is_core) 00158 self.assertEquals(True, p.local_only) 00159 00160 self.assert_(p.config is None) 00161 p._load_config() 00162 self.assert_(p.config is not None) 00163 self.assert_(p.config.nodes) 00164 00165 # try again with port override 00166 p = ROSLaunchParent(run_id, [rl_file], is_core = False, port=11312, local_only=True) 00167 self.assertEquals(11312, p.port) 00168 self.assert_(p.config is None) 00169 p._load_config() 00170 # - make sure port got passed into master 00171 _, port = roslib.network.parse_http_host_and_port(p.config.master.uri) 00172 self.assertEquals(11312, port) 00173 00174 # try again with bad file 00175 p = ROSLaunchParent(run_id, ['non-existent-fake.launch']) 00176 self.assert_(p.config is None) 00177 try: 00178 p._load_config() 00179 self.fail("load config should have failed due to bad rl file") 00180 except roslaunch.core.RLException: pass 00181 00182 # try again with bad xml 00183 rl_dir = roslib.packages.get_pkg_dir('test_roslaunch') 00184 rl_file = os.path.join(rl_dir, 'test', 'xml', 'test-params-invalid-1.xml') 00185 self.assert_(os.path.isfile(rl_file)) 00186 p = ROSLaunchParent(run_id, [rl_file]) 00187 self.assert_(p.config is None) 00188 try: 00189 p._load_config() 00190 self.fail("load config should have failed due to bad rl file") 00191 except roslaunch.core.RLException: pass 00192 00193 # run an empty launch 00194 if 0: 00195 p = ROSLaunchParent(run_id, [], is_core = False, port=None, local_only=True) 00196 self.assertEquals(run_id, p.run_id) 00197 self.assertEquals(False, p.is_core) 00198 self.assertEquals(True, p.local_only) 00199 00200 thread.start_new_thread(kill_parent, (p,)) 00201 p.start() 00202 p.spin() 00203 00204 # Mess around with internal repr to get code coverage on _init_runner/_init_remote 00205 p = ROSLaunchParent(run_id, [], is_core = False, port=None, local_only=True) 00206 # no config, _init_runner/_init_remote/_start_server should fail 00207 for m in ['_init_runner', '_init_remote', '_start_server']: 00208 try: 00209 getattr(p, m)() 00210 self.fail('should have raised') 00211 except roslaunch.core.RLException: pass 00212 00213 # - initialize p.config 00214 p.config = roslaunch.config.ROSLaunchConfig() 00215 00216 # no pm, _init_runner/_init_remote/_start_server should fail 00217 for m in ['_init_runner', '_init_remote', '_start_server']: 00218 try: 00219 getattr(p, m)() 00220 self.fail('should have raised') 00221 except roslaunch.core.RLException: pass 00222 00223 # - initialize p.pm 00224 p.pm = pmon 00225 00226 for m in ['_init_runner', '_init_remote']: 00227 try: 00228 getattr(p, m)() 00229 self.fail('should have raised') 00230 except roslaunch.core.RLException: pass 00231 00232 from roslaunch.server import ROSLaunchParentNode 00233 p.server = ROSLaunchParentNode(p.config, pmon) 00234 p._init_runner() 00235 # roslaunch runner should be initialized 00236 self.assert_(p.runner is not None) 00237 00238 # test _init_remote 00239 p.local_only = True 00240 p._init_remote() 00241 p.local_only = False 00242 # - this violates many abstractions to do this 00243 def ftrue(): 00244 return True 00245 p.config.has_remote_nodes = ftrue 00246 p._init_remote() 00247 self.assert_(p.remote_runner is not None) 00248 00249 self.failIf(pmon.is_shutdown) 00250 p.shutdown() 00251 self.assert_(pmon.is_shutdown) 00252 00253 def kill_parent(p, delay=1.0): 00254 # delay execution so that whatever pmon method we're calling has time to enter 00255 import time 00256 time.sleep(delay) 00257 print "stopping parent" 00258 p.shutdown() 00259 00260 if __name__ == '__main__': 00261 rostest.unitrun('test_roslaunch', sys.argv[0], TestRoslaunchParent, coverage_packages=['roslaunch.parent']) 00262