Package roslaunch :: Module remote
[frames] | no frames]

Source Code for Module roslaunch.remote

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2008, Willow Garage, Inc. 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32  # 
 33  # Revision $Id$ 
 34   
 35  """ 
 36  Integrates roslaunch remote process launching capabilities. 
 37  """ 
 38   
 39  import logging 
 40  import socket 
 41  import time 
 42   
 43  import rosgraph.network as network 
 44   
 45  import roslaunch.config  
 46  import roslaunch.remoteprocess  
 47  from roslaunch.remoteprocess import SSHChildROSLaunchProcess 
 48  import roslaunch.launch 
 49  import roslaunch.server #ROSLaunchParentNode hidden dep 
 50  from roslaunch.core import RLException, is_machine_local, printerrlog, printlog 
 51  from roslaunch.nodeprocess import DEFAULT_TIMEOUT_SIGINT, DEFAULT_TIMEOUT_SIGTERM 
 52   
 53  _CHILD_REGISTER_TIMEOUT = 10.0 #seconds 
 54       
55 -class ROSRemoteRunner(roslaunch.launch.ROSRemoteRunnerIF):
56 """ 57 Manages the running of remote roslaunch children 58 """ 59
60 - def __init__(self, run_id, rosconfig, pm, server, sigint_timeout=DEFAULT_TIMEOUT_SIGINT, sigterm_timeout=DEFAULT_TIMEOUT_SIGTERM):
61 """ 62 :param run_id: roslaunch run_id of this runner, ``str`` 63 :param config: launch configuration, ``ROSConfig`` 64 :param pm process monitor, ``ProcessMonitor`` 65 :param server: roslaunch parent server, ``ROSLaunchParentNode`` 66 :param sigint_timeout: The SIGINT timeout used when killing nodes (in seconds). 67 :type sigint_timeout: float 68 :param sigterm_timeout: The SIGTERM timeout used when killing nodes if SIGINT does not stop the node (in seconds). 69 :type sigterm_timeout: float 70 """ 71 self.run_id = run_id 72 self.rosconfig = rosconfig 73 self.server = server 74 self.pm = pm 75 self.sigint_timeout = sigint_timeout 76 self.sigterm_timeout = sigterm_timeout 77 self.logger = logging.getLogger('roslaunch.remote') 78 self.listeners = [] 79 80 self.machine_list = [] 81 self.remote_processes = []
82
83 - def add_process_listener(self, l):
84 """ 85 Listen to events about remote processes dying. Not 86 threadsafe. Must be called before processes started. 87 88 :param l: ProcessListener 89 """ 90 self.listeners.append(l)
91
92 - def _start_child(self, server_node_uri, machine, counter):
93 # generate a name for the machine. don't use config key as 94 # it's too long to easily display 95 name = "%s-%s"%(machine.address, counter) 96 97 self.logger.info("remote[%s] starting roslaunch", name) 98 printlog("remote[%s] starting roslaunch"%name) 99 100 p = SSHChildROSLaunchProcess(self.run_id, name, server_node_uri, machine, self.rosconfig.master.uri, 101 sigint_timeout=self.sigint_timeout, sigterm_timeout=self.sigterm_timeout) 102 success = p.start() 103 self.pm.register(p) 104 if not success: #treat as fatal 105 raise RLException("unable to start remote roslaunch child: %s"%name) 106 self.server.add_child(name, p) 107 return p
108
109 - def start_children(self):
110 """ 111 Start the child roslaunch processes 112 """ 113 server_node_uri = self.server.uri 114 if not server_node_uri: 115 raise RLException("server URI is not initialized") 116 117 # TODOXXX: break out table building code into a separate 118 # routine so we can unit test it _start_child() should not be 119 # determining the process name 120 121 # Build table of unique machines that we are going to launch on 122 machines = {} 123 for n in self.rosconfig.nodes: 124 if not is_machine_local(n.machine): 125 machines[n.machine.config_key()] = n.machine 126 127 # Launch child roslaunch processes on remote machines 128 counter = 0 129 # - keep a list of procs so we can check for those that failed to launch 130 procs = [] 131 for m in machines: 132 p = self._start_child(server_node_uri, machines[m], counter) 133 procs.append(p) 134 counter += 1 135 136 # Wait for all children to call register() callback. The machines can have 137 # non-uniform registration timeouts. We consider the failure to occur once 138 # one of the machines has failed to meet it's timeout. 139 start_t = time.time() 140 while True: 141 pending = [] 142 for p in procs: 143 if not p.is_alive(): 144 raise RLException("remote roslaunch failed to launch: %s"%p.machine.name) 145 elif not p.uri: 146 pending.append(p.machine) 147 if not pending: 148 break 149 # timeout is the minimum of the remaining timeouts of the machines 150 timeout_t = start_t + min([m.timeout for m in pending]) 151 if time.time() > timeout_t: 152 break 153 time.sleep(0.1) 154 if pending: 155 raise RLException( 156 """The following roslaunch remote processes failed to register: 157 %s 158 159 If this is a network latency issue, you may wish to consider setting 160 <machine timeout="NUMBER OF SECONDS" ... /> 161 in your launch"""%'\n'.join([" * %s (timeout %ss)"%(m.name, m.timeout) for m in pending])) 162 163 # convert machine dictionary to a list 164 self.machine_list = machines.values() 165 # save a list of the remote processes 166 self.remote_processes = procs
167 168
169 - def _assume_failed(self, nodes, failed):
170 """ 171 Utility routine for logging/recording nodes that failed 172 173 :param nodes: list of nodes that are assumed to have failed, ``Node`` 174 :param failed: list of names of nodes that have failed to extend, ``[str]`` 175 """ 176 str_nodes = ["%s/%s"%(n.package, n.type) for n in nodes] 177 failed.extend(str_nodes) 178 printerrlog("Launch of the following nodes most likely failed: %s"%', '.join(str_nodes))
179
180 - def launch_remote_nodes(self):
181 """ 182 Contact each child to launch remote nodes 183 """ 184 succeeded = [] 185 failed = [] 186 187 # initialize remote_nodes. we use the machine config key as 188 # the key for the dictionary so that we can bin the nodes. 189 self.remote_nodes = {} 190 for m in self.machine_list: 191 self.remote_nodes[m.config_key()] = [] 192 193 # build list of nodes that will be launched by machine 194 nodes = [x for x in self.rosconfig.nodes if not is_machine_local(x.machine)] 195 for n in nodes: 196 self.remote_nodes[n.machine.config_key()].append(n) 197 198 for child in self.remote_processes: 199 nodes = self.remote_nodes[child.machine.config_key()] 200 body = '\n'.join([n.to_remote_xml() for n in nodes]) 201 # #3799: force utf-8 encoding 202 xml = '<?xml version="1.0" encoding="utf-8"?>\n<launch>\n%s</launch>'%body 203 204 api = child.getapi() 205 # TODO: timeouts 206 try: 207 self.logger.debug("sending [%s] XML [\n%s\n]"%(child.uri, xml)) 208 code, msg, val = api.launch(xml) 209 if code == 1: 210 c_succ, c_fail = val 211 succeeded.extend(c_succ) 212 failed.extend(c_fail) 213 else: 214 printerrlog('error launching on [%s, uri %s]: %s'%(child.name, child.uri, msg)) 215 self._assume_failed(nodes, failed) 216 except socket.error as e: 217 errno, msg = e 218 printerrlog('error launching on [%s, uri %s]: %s'%(child.name, child.uri, str(msg))) 219 self._assume_failed(nodes, failed) 220 221 except socket.gaierror as e: 222 errno, msg = e 223 # usually errno == -2. See #815. 224 child_host, _ = network.parse_http_host_and_port(child.uri) 225 printerrlog("Unable to contact remote roslaunch at [%s]. This is most likely due to a network misconfiguration with host lookups. Please make sure that you can contact '%s' from this machine"%(child.uri, child_host)) 226 self._assume_failed(nodes, failed) 227 228 except Exception as e: 229 printerrlog('error launching on [%s, uri %s]: %s'%(child.name, child.uri, str(e))) 230 self._assume_failed(nodes, failed) 231 232 return succeeded, failed
233