Package roslaunch :: Module nodeprocess
[frames] | no frames]

Source Code for Module roslaunch.nodeprocess

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2008, Willow Garage, Inc. 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32  # 
 33  # Revision $Id$ 
 34   
 35  """ 
 36  Local process implementation for running and monitoring nodes. 
 37  """ 
 38   
 39  import os 
 40  import signal 
 41  import subprocess  
 42  import time 
 43  import traceback 
 44   
 45  import rospkg 
 46   
 47  from roslaunch.core import * 
 48  #from roslaunch.core import setup_env 
 49  from roslaunch.node_args import create_local_process_args 
 50  from roslaunch.pmon import Process, FatalProcessLaunch 
 51   
 52  import logging 
 53  _logger = logging.getLogger("roslaunch") 
 54   
 55  _TIMEOUT_SIGINT  = 15.0 #seconds 
 56  _TIMEOUT_SIGTERM = 2.0 #seconds 
 57   
 58  _counter = 0 
59 -def _next_counter():
60 global _counter 61 _counter += 1 62 return _counter
63
64 -def create_master_process(run_id, type_, ros_root, port):
65 """ 66 Launch a master 67 @param type_: name of master executable (currently just Master.ZENMASTER) 68 @type type_: str 69 @param ros_root: ROS_ROOT environment setting 70 @type ros_root: str 71 @param port: port to launch master on 72 @type port: int 73 @raise RLException: if type_ or port is invalid 74 """ 75 if port < 1 or port > 65535: 76 raise RLException("invalid port assignment: %s"%port) 77 78 _logger.info("create_master_process: %s, %s, %s", type_, ros_root, port) 79 # catkin/fuerte: no longer use ROS_ROOT-relative executables, search path instead 80 master = type_ 81 # zenmaster is deprecated and aliased to rosmaster 82 if type_ in [Master.ROSMASTER, Master.ZENMASTER]: 83 package = 'rosmaster' 84 args = [master, '--core', '-p', str(port)] 85 else: 86 raise RLException("unknown master typ_: %s"%type_) 87 88 _logger.info("process[master]: launching with args [%s]"%args) 89 log_output = False 90 return LocalProcess(run_id, package, 'master', args, os.environ, log_output, None)
91
92 -def create_node_process(run_id, node, master_uri):
93 """ 94 Factory for generating processes for launching local ROS 95 nodes. Also registers the process with the L{ProcessMonitor} so that 96 events can be generated when the process dies. 97 98 @param run_id: run_id of launch 99 @type run_id: str 100 @param node: node to launch. Node name must be assigned. 101 @type node: L{Node} 102 @param master_uri: API URI for master node 103 @type master_uri: str 104 @return: local process instance 105 @rtype: L{LocalProcess} 106 @raise NodeParamsException: If the node's parameters are improperly specific 107 """ 108 _logger.info("create_node_process: package[%s] type[%s] machine[%s] master_uri[%s]", node.package, node.type, node.machine, master_uri) 109 # check input args 110 machine = node.machine 111 if machine is None: 112 raise RLException("Internal error: no machine selected for node of type [%s/%s]"%(node.package, node.type)) 113 if not node.name: 114 raise ValueError("node name must be assigned") 115 116 # - setup env for process (vars must be strings for os.environ) 117 env = setup_env(node, machine, master_uri) 118 119 if not node.name: 120 raise ValueError("node name must be assigned") 121 122 # we have to include the counter to prevent potential name 123 # collisions between the two branches 124 125 name = "%s-%s"%(rosgraph.names.ns_join(node.namespace, node.name), _next_counter()) 126 if name[0] == '/': 127 name = name[1:] 128 129 _logger.info('process[%s]: env[%s]', name, env) 130 131 args = create_local_process_args(node, machine) 132 133 _logger.info('process[%s]: args[%s]', name, args) 134 135 # default for node.output not set is 'log' 136 log_output = node.output != 'screen' 137 _logger.debug('process[%s]: returning LocalProcess wrapper') 138 return LocalProcess(run_id, node.package, name, args, env, log_output, respawn=node.respawn, required=node.required, cwd=node.cwd)
139 140
141 -class LocalProcess(Process):
142 """ 143 Process launched on local machine 144 """ 145
146 - def __init__(self, run_id, package, name, args, env, log_output, respawn=False, required=False, cwd=None, is_node=True):
147 """ 148 @param run_id: unique run ID for this roslaunch. Used to 149 generate log directory location. run_id may be None if this 150 feature is not being used. 151 @type run_id: str 152 @param package: name of package process is part of 153 @type package: str 154 @param name: name of process 155 @type name: str 156 @param args: list of arguments to process 157 @type args: [str] 158 @param env: environment dictionary for process 159 @type env: {str : str} 160 @param log_output: if True, log output streams of process 161 @type log_output: bool 162 @param respawn: respawn process if it dies (default is False) 163 @type respawn: bool 164 @param cwd: working directory of process, or None 165 @type cwd: str 166 @param is_node: (optional) if True, process is ROS node and accepts ROS node command-line arguments. Default: True 167 @type is_node: False 168 """ 169 super(LocalProcess, self).__init__(package, name, args, env, respawn, required) 170 self.run_id = run_id 171 self.popen = None 172 self.log_output = log_output 173 self.started = False 174 self.stopped = False 175 self.cwd = cwd 176 self.log_dir = None 177 self.pid = -1 178 self.is_node = is_node
179 180 # NOTE: in the future, info() is going to have to be sufficient for relaunching a process
181 - def get_info(self):
182 """ 183 Get all data about this process in dictionary form 184 """ 185 info = super(LocalProcess, self).get_info() 186 info['pid'] = self.pid 187 if self.run_id: 188 info['run_id'] = self.run_id 189 info['log_output'] = self.log_output 190 if self.cwd is not None: 191 info['cwd'] = self.cwd 192 return info
193
194 - def _configure_logging(self):
195 """ 196 Configure logging of node's log file and stdout/stderr 197 @return: stdout log file name, stderr log file 198 name. Values are None if stdout/stderr are not logged. 199 @rtype: str, str 200 """ 201 log_dir = rospkg.get_log_dir(env=os.environ) 202 if self.run_id: 203 log_dir = os.path.join(log_dir, self.run_id) 204 if not os.path.exists(log_dir): 205 try: 206 os.makedirs(log_dir) 207 except OSError, (errno, msg): 208 if errno == 13: 209 raise RLException("unable to create directory for log file [%s].\nPlease check permissions."%log_dir) 210 else: 211 raise RLException("unable to create directory for log file [%s]: %s"%(log_dir, msg)) 212 # #973: save log dir for error messages 213 self.log_dir = log_dir 214 215 # send stdout/stderr to file. in the case of respawning, we have to 216 # open in append mode 217 # note: logfileerr: disabling in favor of stderr appearing in the console. 218 # will likely reinstate once roserr/rosout is more properly used. 219 logfileout = logfileerr = None 220 logfname = self._log_name() 221 222 if self.log_output: 223 outf, errf = [os.path.join(log_dir, '%s-%s.log'%(logfname, n)) for n in ['stdout', 'stderr']] 224 if self.respawn: 225 mode = 'a' 226 else: 227 mode = 'w' 228 logfileout = open(outf, mode) 229 if is_child_mode(): 230 logfileerr = open(errf, mode) 231 232 # #986: pass in logfile name to node 233 node_log_file = log_dir 234 if self.is_node: 235 # #1595: on respawn, these keep appending 236 self.args = _cleanup_remappings(self.args, '__log:=') 237 self.args.append("__log:=%s"%os.path.join(log_dir, "%s.log"%(logfname))) 238 239 return logfileout, logfileerr
240
241 - def start(self):
242 """ 243 Start the process. 244 245 @raise FatalProcessLaunch: if process cannot be started and it 246 is not likely to ever succeed 247 """ 248 super(LocalProcess, self).start() 249 try: 250 self.lock.acquire() 251 if self.started: 252 _logger.info("process[%s]: restarting os process", self.name) 253 else: 254 _logger.info("process[%s]: starting os process", self.name) 255 self.started = self.stopped = False 256 257 full_env = self.env 258 259 # _configure_logging() can mutate self.args 260 try: 261 logfileout, logfileerr = self._configure_logging() 262 except Exception, e: 263 _logger.error(traceback.format_exc()) 264 printerrlog("[%s] ERROR: unable to configure logging [%s]"%(self.name, str(e))) 265 # it's not safe to inherit from this process as 266 # rostest changes stdout to a StringIO, which is not a 267 # proper file. 268 logfileout, logfileerr = subprocess.PIPE, subprocess.PIPE 269 270 if self.cwd == 'node': 271 cwd = os.path.dirname(self.args[0]) 272 elif self.cwd == 'cwd': 273 cwd = os.getcwd() 274 elif self.cwd == 'ros-root': 275 cwd = get_ros_root() 276 else: 277 cwd = rospkg.get_ros_home() 278 279 _logger.info("process[%s]: start w/ args [%s]", self.name, self.args) 280 _logger.info("process[%s]: cwd will be [%s]", self.name, cwd) 281 282 try: 283 self.popen = subprocess.Popen(self.args, cwd=cwd, stdout=logfileout, stderr=logfileerr, env=full_env, close_fds=True, preexec_fn=os.setsid) 284 except OSError, (errno, msg): 285 self.started = True # must set so is_alive state is correct 286 _logger.error("OSError(%d, %s)", errno, msg) 287 if errno == 8: #Exec format error 288 raise FatalProcessLaunch("Unable to launch [%s]. \nIf it is a script, you may be missing a '#!' declaration at the top."%self.name) 289 elif errno == 2: #no such file or directory 290 raise FatalProcessLaunch("""Roslaunch got a '%s' error while attempting to run: 291 292 %s 293 294 Please make sure that all the executables in this command exist and have 295 executable permission. This is often caused by a bad launch-prefix."""%(msg, ' '.join(self.args))) 296 else: 297 raise FatalProcessLaunch("unable to launch [%s]: %s"%(' '.join(self.args), msg)) 298 299 self.started = True 300 # Check that the process is either still running (poll returns 301 # None) or that it completed successfully since when we 302 # launched it above (poll returns the return code, 0). 303 poll_result = self.popen.poll() 304 if poll_result is None or poll_result == 0: 305 self.pid = self.popen.pid 306 printlog_bold("process[%s]: started with pid [%s]"%(self.name, self.pid)) 307 return True 308 else: 309 printerrlog("failed to start local process: %s"%(' '.join(self.args))) 310 return False 311 finally: 312 self.lock.release()
313
314 - def _log_name(self):
315 return self.name.replace('/', '-')
316
317 - def is_alive(self):
318 """ 319 @return: True if process is still running 320 @rtype: bool 321 """ 322 if not self.started: #not started yet 323 return True 324 if self.stopped or self.popen is None: 325 return False 326 self.exit_code = self.popen.poll() 327 if self.exit_code is not None: 328 return False 329 return True
330
331 - def get_exit_description(self):
332 """ 333 @return: human-readable description of exit state 334 @rtype: str 335 """ 336 if self.exit_code is None: 337 output = 'process has died without exit code [pid %s, cmd %s].'%(self.pid, ' '.join(self.args)) 338 elif self.exit_code != 0: 339 output = 'process has died [pid %s, exit code %s, cmd %s].'%(self.pid, self.exit_code, ' '.join(self.args)) 340 else: 341 output = 'process has finished cleanly' 342 343 if self.log_dir: 344 # #973: include location of output location in message 345 output += '\nlog file: %s*.log'%(os.path.join(self.log_dir, self._log_name())) 346 return output
347
348 - def _stop_unix(self, errors):
349 """ 350 UNIX implementation of process killing 351 352 @param errors: error messages. stop() will record messages into this list. 353 @type errors: [str] 354 """ 355 self.exit_code = self.popen.poll() 356 if self.exit_code is not None: 357 _logger.debug("process[%s].stop(): process has already returned %s", self.name, self.exit_code) 358 #print "process[%s].stop(): process has already returned %s"%(self.name, self.exit_code) 359 self.popen = None 360 self.stopped = True 361 return 362 363 pid = self.popen.pid 364 pgid = os.getpgid(pid) 365 _logger.info("process[%s]: killing os process with pid[%s] pgid[%s]", self.name, pid, pgid) 366 367 try: 368 # Start with SIGINT and escalate from there. 369 _logger.info("[%s] sending SIGINT to pgid [%s]", self.name, pgid) 370 os.killpg(pgid, signal.SIGINT) 371 _logger.info("[%s] sent SIGINT to pgid [%s]", self.name, pgid) 372 timeout_t = time.time() + _TIMEOUT_SIGINT 373 retcode = self.popen.poll() 374 while time.time() < timeout_t and retcode is None: 375 time.sleep(0.1) 376 retcode = self.popen.poll() 377 # Escalate non-responsive process 378 if retcode is None: 379 printerrlog("[%s] escalating to SIGTERM"%self.name) 380 timeout_t = time.time() + _TIMEOUT_SIGTERM 381 os.killpg(pgid, signal.SIGTERM) 382 _logger.info("[%s] sent SIGTERM to pgid [%s]"%(self.name, pgid)) 383 retcode = self.popen.poll() 384 while time.time() < timeout_t and retcode is None: 385 time.sleep(0.2) 386 _logger.debug('poll for retcode') 387 retcode = self.popen.poll() 388 if retcode is None: 389 printerrlog("[%s] escalating to SIGKILL"%self.name) 390 errors.append("process[%s, pid %s]: required SIGKILL. May still be running."%(self.name, pid)) 391 try: 392 os.killpg(pgid, signal.SIGKILL) 393 _logger.info("[%s] sent SIGKILL to pgid [%s]"%(self.name, pgid)) 394 # #2096: don't block on SIGKILL, because this results in more orphaned processes overall 395 #self.popen.wait() 396 #os.wait() 397 _logger.info("process[%s]: sent SIGKILL", self.name) 398 except OSError, e: 399 if e.args[0] == 3: 400 printerrlog("no [%s] process with pid [%s]"%(self.name, pid)) 401 else: 402 printerrlog("errors shutting down [%s], see log for details"%self.name) 403 _logger.error(traceback.format_exc()) 404 else: 405 _logger.info("process[%s]: SIGTERM killed with return value %s", self.name, retcode) 406 else: 407 _logger.info("process[%s]: SIGINT killed with return value %s", self.name, retcode) 408 409 finally: 410 self.popen = None
411
412 - def _stop_win32(self, errors):
413 """ 414 Win32 implementation of process killing. In part, refer to 415 416 http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/347462 417 418 Note that it doesn't work as completely as _stop_unix as it can't utilise 419 group id's. This means that any program which forks children underneath it 420 won't get caught by this kill mechanism. 421 422 @param errors: error messages. stop() will record messages into this list. 423 @type errors: [str] 424 """ 425 self.exit_code = self.popen.poll() 426 if self.exit_code is not None: 427 _logger.debug("process[%s].stop(): process has already returned %s", self.name, self.exit_code) 428 self.popen = None 429 self.stopped = True 430 return 431 432 pid = self.popen.pid 433 _logger.info("process[%s]: killing os process/subprocesses with pid[%s]", self.name, pid) 434 # windows has no group id's :( 435 try: 436 # Start with SIGINT and escalate from there. 437 _logger.info("[%s] sending SIGINT to pgid [%s]", self.name, pid) 438 os.kill(pid, signal.SIGINT) 439 _logger.info("[%s] sent SIGINT to pgid [%s]", self.name, pid) 440 timeout_t = time.time() + _TIMEOUT_SIGINT 441 retcode = self.popen.poll() 442 while time.time() < timeout_t and retcode is None: 443 time.sleep(0.1) 444 retcode = self.popen.poll() 445 # Escalate non-responsive process 446 if retcode is None: 447 printerrlog("[%s] escalating to SIGTERM"%self.name) 448 timeout_t = time.time() + _TIMEOUT_SIGTERM 449 os.killpg(pid, signal.SIGTERM) 450 _logger.info("[%s] sent SIGTERM to pid [%s]"%(self.name, pid)) 451 retcode = self.popen.poll() 452 while time.time() < timeout_t and retcode is None: 453 time.sleep(0.2) 454 _logger.debug('poll for retcode') 455 retcode = self.popen.poll() 456 if retcode is None: 457 printerrlog("[%s] escalating to SIGKILL"%self.name) 458 errors.append("process[%s, pid %s]: required SIGKILL. May still be running."%(self.name, pid)) 459 try: 460 os.killpg(pid, signal.SIGKILL) 461 _logger.info("[%s] sent SIGKILL to pid [%s]"%(self.name, pid)) 462 # #2096: don't block on SIGKILL, because this results in more orphaned processes overall 463 #self.popen.wait() 464 #os.wait() 465 _logger.info("process[%s]: sent SIGKILL", self.name) 466 except OSError, e: 467 if e.args[0] == 3: 468 printerrlog("no [%s] process with pid [%s]"%(self.name, pid)) 469 else: 470 printerrlog("errors shutting down [%s], see log for details"%self.name) 471 _logger.error(traceback.format_exc()) 472 else: 473 _logger.info("process[%s]: SIGTERM killed with return value %s", self.name, retcode) 474 else: 475 _logger.info("process[%s]: SIGINT killed with return value %s", self.name, retcode) 476 finally: 477 self.popen = None
478
479 - def stop(self, errors=None):
480 """ 481 Stop the process. Record any significant error messages in the errors parameter 482 483 @param errors: error messages. stop() will record messages into this list. 484 @type errors: [str] 485 """ 486 if errors is None: 487 errors = [] 488 super(LocalProcess, self).stop(errors) 489 self.lock.acquire() 490 try: 491 try: 492 _logger.debug("process[%s].stop() starting", self.name) 493 if self.popen is None: 494 _logger.debug("process[%s].stop(): popen is None, nothing to kill") 495 return 496 if sys.platform in ['win32']: # cygwin seems to be ok 497 self._stop_win32(errors) 498 else: 499 self._stop_unix(errors) 500 except: 501 #traceback.print_exc() 502 _logger.error("[%s] EXCEPTION %s", self.name, traceback.format_exc()) 503 finally: 504 self.stopped = True 505 self.lock.release()
506 507 508 # #1595
509 -def _cleanup_remappings(args, prefix):
510 """ 511 Remove all instances of args that start with prefix. This is used 512 to remove args that were previously added (and are now being 513 regenerated due to respawning) 514 """ 515 existing_args = [a for a in args if a.startswith(prefix)] 516 for a in existing_args: 517 args.remove(a) 518 return args
519