00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 import roslib; roslib.load_manifest('test_roslaunch')
00037
00038 import os, sys, unittest
00039 import time
00040
00041 import rostest
00042 import roslaunch.child
00043
00044 import roslaunch.server
00045
00046 class ChildProcessMock(roslaunch.server.ChildROSLaunchProcess):
00047 def __init__(self, name, args=[], env={}):
00048 super(ChildProcessMock, self).__init__(name, args, env)
00049 self.stopped = False
00050 def stop(self):
00051 self.stopped = True
00052
00053
00054 class ProcessMonitorMock(object):
00055 def __init__(self):
00056 self.core_procs = []
00057 self.procs = []
00058 self.listeners = []
00059
00060 def join(self, timeout=0):
00061 pass
00062
00063 def add_process_listener(self, l):
00064 self.listeners.append(l)
00065
00066 def register(self, p):
00067 self.procs.append(p)
00068
00069 def register_core_proc(self, p):
00070 self.core_procs.append(p)
00071
00072 def registrations_complete(self):
00073 pass
00074
00075 def unregister(self, p):
00076 self.procs.remove(p)
00077
00078 def has_process(self, name):
00079 return len([p for p in self.procs if p.name == name]) > 0
00080
00081 def get_process(self, name):
00082 val = [p for p in self.procs if p.name == name]
00083 if val:
00084 return val[0]
00085 return None
00086
00087 def has_main_thread_jobs(self):
00088 return False
00089
00090 def do_main_thread_jobs(self):
00091 pass
00092
00093 def kill_process(self, name):
00094 pass
00095
00096 def shutdown(self):
00097 pass
00098
00099 def get_active_names(self):
00100 return [p.name for p in self.procs]
00101
00102 def get_process_names_with_spawn_count(self):
00103 actives = [(p.name, p.spawn_count) for p in self.procs]
00104 deads = []
00105 return [actives, deads]
00106
00107 def mainthread_spin_once(self):
00108 pass
00109
00110 def mainthread_spin(self):
00111 pass
00112
00113 def run(self):
00114 pass
00115
00116 import time
00117 from xmlrpclib import ServerProxy
00118
00119 class TestRoslaunchChild(unittest.TestCase):
00120
00121 def setUp(self):
00122 self.pmon = ProcessMonitorMock()
00123 import roslib.params
00124 try:
00125
00126 self.run_id = roslib.params.get_param('/run_id')
00127 except:
00128 self.run_id = 'foo-%s'%time.time()
00129
00130 def test_RoslaunchChild(self):
00131
00132
00133
00134 from roslaunch.child import ROSLaunchChild
00135
00136 name = 'child-%s'%time.time()
00137 server_uri = 'http://unroutable:1234'
00138 c = ROSLaunchChild(self.run_id, name, server_uri)
00139 self.assertEquals(self.run_id, c.run_id)
00140 self.assertEquals(name, c.name)
00141 self.assertEquals(server_uri, c.server_uri)
00142
00143 self.assertEquals(None, c.pm)
00144 self.assertEquals(None, c.child_server)
00145
00146
00147 c.shutdown()
00148
00149
00150 c = ROSLaunchChild(self.run_id, name, server_uri)
00151
00152
00153 c._start_pm()
00154 self.assert_(c.pm is not None)
00155 c.shutdown()
00156
00157
00158
00159
00160 import roslaunch.config
00161 server = roslaunch.server.ROSLaunchParentNode(roslaunch.config.ROSLaunchConfig(), self.pmon)
00162
00163 server.add_child(name, ChildProcessMock('foo'))
00164 try:
00165 server.start()
00166 self.assert_(server.uri, "server URI did not initialize")
00167
00168 c = ROSLaunchChild(self.run_id, name, server.uri)
00169 c.pm = self.pmon
00170
00171 c.run()
00172 finally:
00173 server.shutdown('test done')
00174
00175
00176 c = ROSLaunchChild(self.run_id, name, server_uri)
00177 def bad():
00178 raise Exception('haha')
00179
00180 c._start_pm = bad
00181 try:
00182
00183
00184 c.run()
00185 except:
00186 pass
00187
00188
00189 def kill_parent(p, delay=1.0):
00190
00191 import time
00192 time.sleep(delay)
00193 print "stopping parent"
00194 p.shutdown()
00195
00196 if __name__ == '__main__':
00197 rostest.unitrun('test_roslaunch', sys.argv[0], TestRoslaunchChild, coverage_packages=['roslaunch.child'])
00198