Package roslaunch :: Module nodeprocess
[frames] | no frames]

Source Code for Module roslaunch.nodeprocess

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2008, Willow Garage, Inc. 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32  # 
 33  # Revision $Id: nodeprocess.py 14283 2011-07-13 02:30:41Z kwc $ 
 34   
 35  """ 
 36  Local process implementation for running and monitoring nodes. 
 37  """ 
 38   
 39  import os 
 40  import sys 
 41  import signal 
 42  import socket 
 43  import subprocess  
 44  import time 
 45  import traceback 
 46   
 47  import roslib.rosenv  
 48  import roslib.network  
 49   
 50  from roslaunch.core import * 
 51  from roslaunch.node_args import create_local_process_env, create_local_process_args 
 52  from roslaunch.pmon import Process, FatalProcessLaunch 
 53   
 54  import logging 
 55  _logger = logging.getLogger("roslaunch") 
 56   
 57  _TIMEOUT_SIGINT  = 15.0 #seconds 
 58  _TIMEOUT_SIGTERM = 2.0 #seconds 
 59   
 60  _counter = 0 
61 -def _next_counter():
62 global _counter 63 _counter += 1 64 return _counter
65
66 -def create_master_process(run_id, type_, ros_root, port):
67 """ 68 Launch a master 69 @param type_: name of master executable (currently just Master.ZENMASTER) 70 @type type_: str 71 @param ros_root: ROS_ROOT environment setting 72 @type ros_root: str 73 @param port: port to launch master on 74 @type port: int 75 @raise RLException: if type_ or port is invalid 76 """ 77 if port < 1 or port > 65535: 78 raise RLException("invalid port assignment: %s"%port) 79 80 _logger.info("create_master_process: %s, %s, %s", type_, ros_root, port) 81 master = os.path.join(ros_root, 'bin', type_) 82 # zenmaster is deprecated and aliased to rosmaster 83 if type_ in [Master.ROSMASTER, Master.ZENMASTER]: 84 package = 'rosmaster' 85 args = [master, '--core', '-p', str(port)] 86 else: 87 raise RLException("unknown master typ_: %s"%type_) 88 89 _logger.info("process[master]: launching with args [%s]"%args) 90 log_output = False 91 return LocalProcess(run_id, package, 'master', args, os.environ, log_output, None)
92
93 -def create_node_process(run_id, node, master_uri):
94 """ 95 Factory for generating processes for launching local ROS 96 nodes. Also registers the process with the L{ProcessMonitor} so that 97 events can be generated when the process dies. 98 99 @param run_id: run_id of launch 100 @type run_id: str 101 @param node: node to launch. Node name must be assigned. 102 @type node: L{Node} 103 @param master_uri: API URI for master node 104 @type master_uri: str 105 @return: local process instance 106 @rtype: L{LocalProcess} 107 @raise NodeParamsException: If the node's parameters are improperly specific 108 """ 109 _logger.info("create_node_process: package[%s] type[%s] machine[%s] master_uri[%s]", node.package, node.type, node.machine, master_uri) 110 # check input args 111 machine = node.machine 112 if machine is None: 113 raise RLException("Internal error: no machine selected for node of type [%s/%s]"%(node.package, node.type)) 114 if not node.name: 115 raise ValueError("node name must be assigned") 116 117 # - setup env for process (vars must be strings for os.environ) 118 env = create_local_process_env(node, machine, master_uri) 119 120 if not node.name: 121 raise ValueError("node name must be assigned") 122 123 # we have to include the counter to prevent potential name 124 # collisions between the two branches 125 126 name = "%s-%s"%(roslib.names.ns_join(node.namespace, node.name), _next_counter()) 127 if name[0] == '/': 128 name = name[1:] 129 130 _logger.info('process[%s]: env[%s]', name, env) 131 132 args = create_local_process_args(node, machine) 133 134 _logger.info('process[%s]: args[%s]', name, args) 135 136 # default for node.output not set is 'log' 137 log_output = node.output != 'screen' 138 _logger.debug('process[%s]: returning LocalProcess wrapper') 139 return LocalProcess(run_id, node.package, name, args, env, log_output, respawn=node.respawn, required=node.required, cwd=node.cwd)
140 141
142 -class LocalProcess(Process):
143 """ 144 Process launched on local machine 145 """ 146
147 - def __init__(self, run_id, package, name, args, env, log_output, respawn=False, required=False, cwd=None, is_node=True):
148 """ 149 @param run_id: unique run ID for this roslaunch. Used to 150 generate log directory location. run_id may be None if this 151 feature is not being used. 152 @type run_id: str 153 @param package: name of package process is part of 154 @type package: str 155 @param name: name of process 156 @type name: str 157 @param args: list of arguments to process 158 @type args: [str] 159 @param env: environment dictionary for process 160 @type env: {str : str} 161 @param log_output: if True, log output streams of process 162 @type log_output: bool 163 @param respawn: respawn process if it dies (default is False) 164 @type respawn: bool 165 @param cwd: working directory of process, or None 166 @type cwd: str 167 @param is_node: (optional) if True, process is ROS node and accepts ROS node command-line arguments. Default: True 168 @type is_node: False 169 """ 170 super(LocalProcess, self).__init__(package, name, args, env, respawn, required) 171 self.run_id = run_id 172 self.popen = None 173 self.log_output = log_output 174 self.started = False 175 self.stopped = False 176 self.cwd = cwd 177 self.log_dir = None 178 self.pid = -1 179 self.is_node = is_node
180 181 # NOTE: in the future, info() is going to have to be sufficient for relaunching a process
182 - def get_info(self):
183 """ 184 Get all data about this process in dictionary form 185 """ 186 info = super(LocalProcess, self).get_info() 187 info['pid'] = self.pid 188 if self.run_id: 189 info['run_id'] = self.run_id 190 info['log_output'] = self.log_output 191 if self.cwd is not None: 192 info['cwd'] = self.cwd 193 return info
194
195 - def _configure_logging(self):
196 """ 197 Configure logging of node's log file and stdout/stderr 198 @return: stdout log file name, stderr log file 199 name. Values are None if stdout/stderr are not logged. 200 @rtype: str, str 201 """ 202 log_dir = roslib.rosenv.get_log_dir(env=os.environ) 203 if self.run_id: 204 log_dir = os.path.join(log_dir, self.run_id) 205 if not os.path.exists(log_dir): 206 try: 207 os.makedirs(log_dir) 208 except OSError, (errno, msg): 209 if errno == 13: 210 raise RLException("unable to create directory for log file [%s].\nPlease check permissions."%log_dir) 211 else: 212 raise RLException("unable to create directory for log file [%s]: %s"%(log_dir, msg)) 213 # #973: save log dir for error messages 214 self.log_dir = log_dir 215 216 # send stdout/stderr to file. in the case of respawning, we have to 217 # open in append mode 218 # note: logfileerr: disabling in favor of stderr appearing in the console. 219 # will likely reinstate once roserr/rosout is more properly used. 220 logfileout = logfileerr = None 221 logfname = self._log_name() 222 223 if self.log_output: 224 outf, errf = [os.path.join(log_dir, '%s-%s.log'%(logfname, n)) for n in ['stdout', 'stderr']] 225 if self.respawn: 226 mode = 'a' 227 else: 228 mode = 'w' 229 logfileout = open(outf, mode) 230 if is_child_mode(): 231 logfileerr = open(errf, mode) 232 233 # #986: pass in logfile name to node 234 node_log_file = log_dir 235 if self.is_node: 236 # #1595: on respawn, these keep appending 237 self.args = _cleanup_remappings(self.args, '__log:=') 238 self.args.append("__log:=%s"%os.path.join(log_dir, "%s.log"%(logfname))) 239 240 return logfileout, logfileerr
241
242 - def start(self):
243 """ 244 Start the process. 245 246 @raise FatalProcessLaunch: if process cannot be started and it 247 is not likely to ever succeed 248 """ 249 super(LocalProcess, self).start() 250 try: 251 self.lock.acquire() 252 if self.started: 253 _logger.info("process[%s]: restarting os process", self.name) 254 else: 255 _logger.info("process[%s]: starting os process", self.name) 256 self.started = self.stopped = False 257 258 full_env = self.env 259 260 # _configure_logging() can mutate self.args 261 try: 262 logfileout, logfileerr = self._configure_logging() 263 except Exception, e: 264 _logger.error(traceback.format_exc()) 265 printerrlog("[%s] ERROR: unable to configure logging [%s]"%(self.name, str(e))) 266 # it's not safe to inherit from this process as 267 # rostest changes stdout to a StringIO, which is not a 268 # proper file. 269 logfileout, logfileerr = subprocess.PIPE, subprocess.PIPE 270 271 if self.cwd == 'node': 272 cwd = os.path.dirname(self.args[0]) 273 elif self.cwd == 'cwd': 274 cwd = os.getcwd() 275 elif self.cwd == 'ros-root': 276 cwd = get_ros_root() 277 else: 278 cwd = roslib.rosenv.get_ros_home() 279 280 _logger.info("process[%s]: start w/ args [%s]", self.name, self.args) 281 _logger.info("process[%s]: cwd will be [%s]", self.name, cwd) 282 283 try: 284 self.popen = subprocess.Popen(self.args, cwd=cwd, stdout=logfileout, stderr=logfileerr, env=full_env, close_fds=True, preexec_fn=os.setsid) 285 except OSError, (errno, msg): 286 self.started = True # must set so is_alive state is correct 287 _logger.error("OSError(%d, %s)", errno, msg) 288 if errno == 8: #Exec format error 289 raise FatalProcessLaunch("Unable to launch [%s]. \nIf it is a script, you may be missing a '#!' declaration at the top."%self.name) 290 elif errno == 2: #no such file or directory 291 raise FatalProcessLaunch("""Roslaunch got a '%s' error while attempting to run: 292 293 %s 294 295 Please make sure that all the executables in this command exist and have 296 executable permission. This is often caused by a bad launch-prefix."""%(msg, ' '.join(self.args))) 297 else: 298 raise FatalProcessLaunch("unable to launch [%s]: %s"%(' '.join(self.args), msg)) 299 300 self.started = True 301 # Check that the process is either still running (poll returns 302 # None) or that it completed successfully since when we 303 # launched it above (poll returns the return code, 0). 304 poll_result = self.popen.poll() 305 if poll_result is None or poll_result == 0: 306 self.pid = self.popen.pid 307 printlog_bold("process[%s]: started with pid [%s]"%(self.name, self.pid)) 308 return True 309 else: 310 printerrlog("failed to start local process: %s"%(' '.join(self.args))) 311 return False 312 finally: 313 self.lock.release()
314
315 - def _log_name(self):
316 return self.name.replace('/', '-')
317
318 - def is_alive(self):
319 """ 320 @return: True if process is still running 321 @rtype: bool 322 """ 323 if not self.started: #not started yet 324 return True 325 if self.stopped or self.popen is None: 326 return False 327 self.exit_code = self.popen.poll() 328 if self.exit_code is not None: 329 return False 330 return True
331
332 - def get_exit_description(self):
333 """ 334 @return: human-readable description of exit state 335 @rtype: str 336 """ 337 # #973: include location of output location in message 338 if self.exit_code is not None: 339 if self.exit_code: 340 if self.log_dir: 341 return 'process has died [pid %s, exit code %s].\nlog files: %s*.log'%(self.pid, self.exit_code, os.path.join(self.log_dir, self._log_name())) 342 else: 343 return 'process has died [pid %s, exit code %s]'%(self.pid, self.exit_code) 344 else: 345 if self.log_dir: 346 return 'process has finished cleanly.\nlog file: %s*.log'%(os.path.join(self.log_dir, self._log_name())) 347 else: 348 return 'process has finished cleanly' 349 else: 350 return 'process has died'
351
352 - def _stop_unix(self, errors):
353 """ 354 UNIX implementation of process killing 355 356 @param errors: error messages. stop() will record messages into this list. 357 @type errors: [str] 358 """ 359 self.exit_code = self.popen.poll() 360 if self.exit_code is not None: 361 _logger.debug("process[%s].stop(): process has already returned %s", self.name, self.exit_code) 362 #print "process[%s].stop(): process has already returned %s"%(self.name, self.exit_code) 363 self.popen = None 364 self.stopped = True 365 return 366 367 pid = self.popen.pid 368 pgid = os.getpgid(pid) 369 _logger.info("process[%s]: killing os process with pid[%s] pgid[%s]", self.name, pid, pgid) 370 371 try: 372 # Start with SIGINT and escalate from there. 373 _logger.info("[%s] sending SIGINT to pgid [%s]", self.name, pgid) 374 os.killpg(pgid, signal.SIGINT) 375 _logger.info("[%s] sent SIGINT to pgid [%s]", self.name, pgid) 376 timeout_t = time.time() + _TIMEOUT_SIGINT 377 retcode = self.popen.poll() 378 while time.time() < timeout_t and retcode is None: 379 time.sleep(0.1) 380 retcode = self.popen.poll() 381 # Escalate non-responsive process 382 if retcode is None: 383 printerrlog("[%s] escalating to SIGTERM"%self.name) 384 timeout_t = time.time() + _TIMEOUT_SIGTERM 385 os.killpg(pgid, signal.SIGTERM) 386 _logger.info("[%s] sent SIGTERM to pgid [%s]"%(self.name, pgid)) 387 retcode = self.popen.poll() 388 while time.time() < timeout_t and retcode is None: 389 time.sleep(0.2) 390 _logger.debug('poll for retcode') 391 retcode = self.popen.poll() 392 if retcode is None: 393 printerrlog("[%s] escalating to SIGKILL"%self.name) 394 errors.append("process[%s, pid %s]: required SIGKILL. May still be running."%(self.name, pid)) 395 try: 396 os.killpg(pgid, signal.SIGKILL) 397 _logger.info("[%s] sent SIGKILL to pgid [%s]"%(self.name, pgid)) 398 # #2096: don't block on SIGKILL, because this results in more orphaned processes overall 399 #self.popen.wait() 400 #os.wait() 401 _logger.info("process[%s]: sent SIGKILL", self.name) 402 except OSError, e: 403 if e.args[0] == 3: 404 printerrlog("no [%s] process with pid [%s]"%(self.name, pid)) 405 else: 406 printerrlog("errors shutting down [%s], see log for details"%self.name) 407 _logger.error(traceback.format_exc()) 408 else: 409 _logger.info("process[%s]: SIGTERM killed with return value %s", self.name, retcode) 410 else: 411 _logger.info("process[%s]: SIGINT killed with return value %s", self.name, retcode) 412 413 finally: 414 self.popen = None
415
416 - def stop(self, errors=None):
417 """ 418 Stop the process. Record any significant error messages in the errors parameter 419 420 @param errors: error messages. stop() will record messages into this list. 421 @type errors: [str] 422 """ 423 if errors is None: 424 errors = [] 425 super(LocalProcess, self).stop(errors) 426 self.lock.acquire() 427 try: 428 try: 429 _logger.debug("process[%s].stop() starting", self.name) 430 if self.popen is None: 431 _logger.debug("process[%s].stop(): popen is None, nothing to kill") 432 return 433 #NOTE: currently POSIX-only. Need to add in Windows code once I have a test environment: 434 # http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/347462 435 self._stop_unix(errors) 436 except: 437 #traceback.print_exc() 438 _logger.error("[%s] EXCEPTION %s", self.name, traceback.format_exc()) 439 finally: 440 self.stopped = True 441 self.lock.release()
442 443 444 # #1595
445 -def _cleanup_remappings(args, prefix):
446 """ 447 Remove all instances of args that start with prefix. This is used 448 to remove args that were previously added (and are now being 449 regenerated due to respawning) 450 """ 451 existing_args = [a for a in args if a.startswith(prefix)] 452 for a in existing_args: 453 args.remove(a) 454 return args
455