Package roshlaunch :: Module remote
[frames] | no frames]

Source Code for Module roshlaunch.remote

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2008, Willow Garage, Inc. 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32  # 
 33  # Revision $Id: remote.py 7331 2009-12-15 22:45:49Z kwc $ 
 34   
 35  """ 
 36  Integrates roslaunch remote process launching capabilities. 
 37  """ 
 38   
 39  import logging 
 40  import socket 
 41  import sys 
 42  import time 
 43   
 44  import roslib.network as network 
 45   
 46  import roslaunch.config  
 47  import roslaunch.remoteprocess  
 48  from roslaunch.remoteprocess import SSHChildROSLaunchProcess 
 49  import roslaunch.runner 
 50  import roslaunch.server #ROSLaunchParentNode hidden dep 
 51  from roslaunch.core import RLException, setup_env, is_machine_local, printerrlog 
 52   
 53  #For using Log message level constants 
 54  from rosgraph_msgs.msg import Log 
 55   
 56  _CHILD_REGISTER_TIMEOUT = 10.0 #seconds 
 57   
 58       
59 -class ROSRemoteRunner(roslaunch.runner.ROSRemoteRunnerIF):
60 """ 61 Manages the running of remote roslaunch children 62 """ 63 64 #ROSHTODO: get rid of rosconfig assumption
65 - def __init__(self, run_id, rosconfig, pm, server):
66 """ 67 @param run_id: roslaunch run_id of this runner 68 @type run_id: str 69 @param config: launch configuration 70 @type config: L{ROSConfig} 71 @param pm process monitor 72 @type pm: L{ProcessMonitor} 73 @param server: roslaunch parent server 74 @type server: L{ROSLaunchParentNode} 75 """ 76 self.run_id = run_id 77 self.rosconfig = rosconfig 78 self.server = server 79 self.pm = pm 80 self.logger = logging.getLogger('roslaunch.remote') 81 self.listeners = [] 82 83 self.machine_list = [] 84 self.remote_processes = []
85
86 - def add_process_listener(self, l):
87 """ 88 Listen to events about remote processes dying. Not 89 threadsafe. Must be called before processes started. 90 @param l: ProcessListener 91 @type l: L{ProcessListener} 92 """ 93 self.listeners.append(l)
94
95 - def _start_child(self, server_node_uri, machine, counter):
96 # generate a name for the machine. don't use config key as 97 # it's too long to easily display 98 name = "%s-%s"%(machine.address, counter) 99 100 self.logger.info("remote[%s] starting roslaunch", name) 101 print "remote[%s] starting roslaunch"%name 102 103 env_dict = setup_env(None, machine, self.rosconfig.master.uri) 104 p = SSHChildROSLaunchProcess(self.run_id, name, server_node_uri, env_dict, machine) 105 success = p.start() 106 self.pm.register(p) 107 if not success: #treat as fatal 108 raise RLException("unable to start remote roslaunch child: %s"%name) 109 self.server.add_child(name, p) 110 return p
111
112 - def start_children(self):
113 """ 114 Start the child roslaunch processes 115 """ 116 server_node_uri = self.server.uri 117 if not server_node_uri: 118 raise RLException("server URI is not initialized") 119 120 # TODOXXX: break out table building code into a separate 121 # routine so we can unit test it _start_child() should not be 122 # determining the process name 123 124 # Build table of unique machines that we are going to launch on 125 machines = {} 126 for n in self.rosconfig.nodes: 127 if not is_machine_local(n.machine): 128 machines[n.machine.config_key()] = n.machine 129 130 # Launch child roslaunch processes on remote machines 131 counter = 0 132 # - keep a list of procs so we can check for those that failed to launch 133 procs = [] 134 for m in machines: 135 p = self._start_child(server_node_uri, machines[m], counter) 136 procs.append(p) 137 counter += 1 138 139 # Wait for all children to call register() callback. The machines can have 140 # non-uniform registration timeouts. We consider the failure to occur once 141 # one of the machines has failed to meet it's timeout. 142 start_t = time.time() 143 while True: 144 pending = [] 145 for p in procs: 146 if not p.is_alive(): 147 raise RLException("remote roslaunch failed to launch: %s"%p.machine.name) 148 elif not p.uri: 149 pending.append(p.machine) 150 if not pending: 151 break 152 # timeout is the minimum of the remaining timeouts of the machines 153 timeout_t = start_t + min([m.timeout for m in pending]) 154 if time.time() > timeout_t: 155 break 156 time.sleep(0.1) 157 if pending: 158 raise RLException( 159 """The following roslaunch remote processes failed to register: 160 %s 161 162 If this is a network latency issue, you may wish to consider setting 163 <machine timeout="NUMBER OF SECONDS" ... /> 164 in your launch"""%'\n'.join([" * %s (timeout %ss)"%(m.name, m.timeout) for m in pending])) 165 166 # convert machine dictionary to a list 167 self.machine_list = machines.values() 168 # save a list of the remote processes 169 self.remote_processes = procs
170 171
172 - def _assume_failed(self, nodes, failed):
173 """ 174 Utility routine for logging/recording nodes that failed 175 @param nodes: list of nodes that are assumed to have failed 176 @type nodes: [L{Node}] 177 @param failed: list of names of nodes that have failed to extend 178 @type failed: [str] 179 """ 180 str_nodes = ["%s/%s"%(n.package, n.type) for n in nodes] 181 failed.extend(str_nodes) 182 printerrlog("Launch of the following nodes most likely failed: %s"%', '.join(str_nodes))
183
184 - def launch_remote_nodes(self):
185 """ 186 Contact each child to launch remote nodes 187 """ 188 succeeded = [] 189 failed = [] 190 191 # initialize remote_nodes. we use the machine config key as 192 # the key for the dictionary so that we can bin the nodes. 193 self.remote_nodes = {} 194 for m in self.machine_list: 195 self.remote_nodes[m.config_key()] = [] 196 197 # build list of nodes that will be launched by machine 198 nodes = [x for x in self.rosconfig.nodes if not is_machine_local(x.machine)] 199 for n in nodes: 200 self.remote_nodes[n.machine.config_key()].append(n) 201 202 for child in self.remote_processes: 203 nodes = self.remote_nodes[child.machine.config_key()] 204 body = '\n'.join([n.to_remote_xml() for n in nodes]) 205 xml = '<launch>\n%s</launch>'%body 206 if 0: 207 print xml 208 209 api = child.getapi() 210 # TODO: timeouts 211 try: 212 self.logger.debug("sending [%s] XML [\n%s\n]"%(child.uri, xml)) 213 code, msg, val = api.launch(xml) 214 if code == 1: 215 c_succ, c_fail = val 216 succeeded.extend(c_succ) 217 failed.extend(c_fail) 218 else: 219 printerrlog('error launching on [%s, uri %s]: %s'%(child.name, child.uri, msg)) 220 self._assume_failed(nodes, failed) 221 except socket.error, (errno, msg): 222 printerrlog('error launching on [%s, uri %s]: %s'%(child.name, child.uri, str(msg))) 223 self._assume_failed(nodes, failed) 224 225 except socket.gaierror, (errno, msg): 226 # usually errno == -2. See #815. 227 child_host, _ = network.parse_http_host_and_port(child.uri) 228 printerrlog("Unable to contact remote roslaunch at [%s]. This is most likely due to a network misconfiguration with host lookups. Please make sure that you can contact '%s' from this machine"%(child.uri, child_host)) 229 self._assume_failed(nodes, failed) 230 231 except Exception, e: 232 printerrlog('error launching on [%s, uri %s]: %s'%(child.name, child.uri, str(e))) 233 self._assume_failed(nodes, failed) 234 235 return succeeded, failed
236