Package roslaunch :: Module nodeprocess
[frames] | no frames]

Source Code for Module roslaunch.nodeprocess

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2008, Willow Garage, Inc. 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32  # 
 33  # Revision $Id$ 
 34   
 35  """ 
 36  Local process implementation for running and monitoring nodes. 
 37  """ 
 38   
 39  import os 
 40  import signal 
 41  import subprocess  
 42  import time 
 43  import traceback 
 44   
 45  import rospkg 
 46   
 47  from roslaunch.core import * 
 48  #from roslaunch.core import setup_env 
 49  from roslaunch.node_args import create_local_process_args 
 50  from roslaunch.pmon import Process, FatalProcessLaunch 
 51   
 52  from rosmaster.master_api import NUM_WORKERS 
 53   
 54  import logging 
 55  _logger = logging.getLogger("roslaunch") 
 56   
 57  _TIMEOUT_SIGINT  = 15.0 #seconds 
 58  _TIMEOUT_SIGTERM = 2.0 #seconds 
 59   
 60  _counter = 0 
61 -def _next_counter():
62 global _counter 63 _counter += 1 64 return _counter
65
66 -def create_master_process(run_id, type_, ros_root, port, num_workers=NUM_WORKERS, timeout=None):
67 """ 68 Launch a master 69 @param type_: name of master executable (currently just Master.ZENMASTER) 70 @type type_: str 71 @param ros_root: ROS_ROOT environment setting 72 @type ros_root: str 73 @param port: port to launch master on 74 @type port: int 75 @param num_workers: number of worker threads. 76 @type num_workers: int 77 @param timeout: socket timeout for connections. 78 @type timeout: float 79 @raise RLException: if type_ or port is invalid 80 """ 81 if port < 1 or port > 65535: 82 raise RLException("invalid port assignment: %s"%port) 83 84 _logger.info("create_master_process: %s, %s, %s, %s, %s", type_, ros_root, port, num_workers, timeout) 85 # catkin/fuerte: no longer use ROS_ROOT-relative executables, search path instead 86 master = type_ 87 # zenmaster is deprecated and aliased to rosmaster 88 if type_ in [Master.ROSMASTER, Master.ZENMASTER]: 89 package = 'rosmaster' 90 args = [master, '--core', '-p', str(port), '-w', str(num_workers)] 91 if timeout is not None: 92 args += ['-t', str(timeout)] 93 else: 94 raise RLException("unknown master typ_: %s"%type_) 95 96 _logger.info("process[master]: launching with args [%s]"%args) 97 log_output = False 98 return LocalProcess(run_id, package, 'master', args, os.environ, log_output, None)
99
100 -def create_node_process(run_id, node, master_uri):
101 """ 102 Factory for generating processes for launching local ROS 103 nodes. Also registers the process with the L{ProcessMonitor} so that 104 events can be generated when the process dies. 105 106 @param run_id: run_id of launch 107 @type run_id: str 108 @param node: node to launch. Node name must be assigned. 109 @type node: L{Node} 110 @param master_uri: API URI for master node 111 @type master_uri: str 112 @return: local process instance 113 @rtype: L{LocalProcess} 114 @raise NodeParamsException: If the node's parameters are improperly specific 115 """ 116 _logger.info("create_node_process: package[%s] type[%s] machine[%s] master_uri[%s]", node.package, node.type, node.machine, master_uri) 117 # check input args 118 machine = node.machine 119 if machine is None: 120 raise RLException("Internal error: no machine selected for node of type [%s/%s]"%(node.package, node.type)) 121 if not node.name: 122 raise ValueError("node name must be assigned") 123 124 # - setup env for process (vars must be strings for os.environ) 125 env = setup_env(node, machine, master_uri) 126 127 if not node.name: 128 raise ValueError("node name must be assigned") 129 130 # we have to include the counter to prevent potential name 131 # collisions between the two branches 132 133 name = "%s-%s"%(rosgraph.names.ns_join(node.namespace, node.name), _next_counter()) 134 if name[0] == '/': 135 name = name[1:] 136 137 _logger.info('process[%s]: env[%s]', name, env) 138 139 args = create_local_process_args(node, machine) 140 141 _logger.info('process[%s]: args[%s]', name, args) 142 143 # default for node.output not set is 'log' 144 log_output = node.output != 'screen' 145 _logger.debug('process[%s]: returning LocalProcess wrapper') 146 return LocalProcess(run_id, node.package, name, args, env, log_output, \ 147 respawn=node.respawn, respawn_delay=node.respawn_delay, \ 148 required=node.required, cwd=node.cwd)
149 150
151 -class LocalProcess(Process):
152 """ 153 Process launched on local machine 154 """ 155
156 - def __init__(self, run_id, package, name, args, env, log_output, 157 respawn=False, respawn_delay=0.0, required=False, cwd=None, 158 is_node=True):
159 """ 160 @param run_id: unique run ID for this roslaunch. Used to 161 generate log directory location. run_id may be None if this 162 feature is not being used. 163 @type run_id: str 164 @param package: name of package process is part of 165 @type package: str 166 @param name: name of process 167 @type name: str 168 @param args: list of arguments to process 169 @type args: [str] 170 @param env: environment dictionary for process 171 @type env: {str : str} 172 @param log_output: if True, log output streams of process 173 @type log_output: bool 174 @param respawn: respawn process if it dies (default is False) 175 @type respawn: bool 176 @param respawn_delay: respawn process after a delay 177 @type respawn_delay: float 178 @param cwd: working directory of process, or None 179 @type cwd: str 180 @param is_node: (optional) if True, process is ROS node and accepts ROS node command-line arguments. Default: True 181 @type is_node: False 182 """ 183 super(LocalProcess, self).__init__(package, name, args, env, 184 respawn, respawn_delay, required) 185 self.run_id = run_id 186 self.popen = None 187 self.log_output = log_output 188 self.started = False 189 self.stopped = False 190 self.cwd = cwd 191 self.log_dir = None 192 self.pid = -1 193 self.is_node = is_node
194 195 # NOTE: in the future, info() is going to have to be sufficient for relaunching a process
196 - def get_info(self):
197 """ 198 Get all data about this process in dictionary form 199 """ 200 info = super(LocalProcess, self).get_info() 201 info['pid'] = self.pid 202 if self.run_id: 203 info['run_id'] = self.run_id 204 info['log_output'] = self.log_output 205 if self.cwd is not None: 206 info['cwd'] = self.cwd 207 return info
208
209 - def _configure_logging(self):
210 """ 211 Configure logging of node's log file and stdout/stderr 212 @return: stdout log file name, stderr log file 213 name. Values are None if stdout/stderr are not logged. 214 @rtype: str, str 215 """ 216 log_dir = rospkg.get_log_dir(env=os.environ) 217 if self.run_id: 218 log_dir = os.path.join(log_dir, self.run_id) 219 if not os.path.exists(log_dir): 220 try: 221 os.makedirs(log_dir) 222 except OSError as e: 223 if e.errno == 13: 224 raise RLException("unable to create directory for log file [%s].\nPlease check permissions."%log_dir) 225 else: 226 raise RLException("unable to create directory for log file [%s]: %s"%(log_dir, e.strerror)) 227 # #973: save log dir for error messages 228 self.log_dir = log_dir 229 230 # send stdout/stderr to file. in the case of respawning, we have to 231 # open in append mode 232 # note: logfileerr: disabling in favor of stderr appearing in the console. 233 # will likely reinstate once roserr/rosout is more properly used. 234 logfileout = logfileerr = None 235 logfname = self._log_name() 236 237 if self.log_output: 238 outf, errf = [os.path.join(log_dir, '%s-%s.log'%(logfname, n)) for n in ['stdout', 'stderr']] 239 if self.respawn: 240 mode = 'a' 241 else: 242 mode = 'w' 243 logfileout = open(outf, mode) 244 if is_child_mode(): 245 logfileerr = open(errf, mode) 246 247 # #986: pass in logfile name to node 248 node_log_file = log_dir 249 if self.is_node: 250 # #1595: on respawn, these keep appending 251 self.args = _cleanup_remappings(self.args, '__log:=') 252 self.args.append("__log:=%s"%os.path.join(log_dir, "%s.log"%(logfname))) 253 254 return logfileout, logfileerr
255
256 - def start(self):
257 """ 258 Start the process. 259 260 @raise FatalProcessLaunch: if process cannot be started and it 261 is not likely to ever succeed 262 """ 263 super(LocalProcess, self).start() 264 try: 265 self.lock.acquire() 266 if self.started: 267 _logger.info("process[%s]: restarting os process", self.name) 268 else: 269 _logger.info("process[%s]: starting os process", self.name) 270 self.started = self.stopped = False 271 272 full_env = self.env 273 274 # _configure_logging() can mutate self.args 275 try: 276 logfileout, logfileerr = self._configure_logging() 277 except Exception as e: 278 _logger.error(traceback.format_exc()) 279 printerrlog("[%s] ERROR: unable to configure logging [%s]"%(self.name, str(e))) 280 # it's not safe to inherit from this process as 281 # rostest changes stdout to a StringIO, which is not a 282 # proper file. 283 logfileout, logfileerr = subprocess.PIPE, subprocess.PIPE 284 285 if self.cwd == 'node': 286 cwd = os.path.dirname(self.args[0]) 287 elif self.cwd == 'cwd': 288 cwd = os.getcwd() 289 elif self.cwd == 'ros-root': 290 cwd = get_ros_root() 291 else: 292 cwd = rospkg.get_ros_home() 293 294 _logger.info("process[%s]: start w/ args [%s]", self.name, self.args) 295 _logger.info("process[%s]: cwd will be [%s]", self.name, cwd) 296 297 try: 298 self.popen = subprocess.Popen(self.args, cwd=cwd, stdout=logfileout, stderr=logfileerr, env=full_env, close_fds=True, preexec_fn=os.setsid) 299 except OSError as e: 300 self.started = True # must set so is_alive state is correct 301 _logger.error("OSError(%d, %s)", e.errno, e.strerror) 302 if e.errno == 8: #Exec format error 303 raise FatalProcessLaunch("Unable to launch [%s]. \nIf it is a script, you may be missing a '#!' declaration at the top."%self.name) 304 elif e.errno == 2: #no such file or directory 305 raise FatalProcessLaunch("""Roslaunch got a '%s' error while attempting to run: 306 307 %s 308 309 Please make sure that all the executables in this command exist and have 310 executable permission. This is often caused by a bad launch-prefix."""%(e.strerror, ' '.join(self.args))) 311 else: 312 raise FatalProcessLaunch("unable to launch [%s]: %s"%(' '.join(self.args), e.strerror)) 313 314 self.started = True 315 # Check that the process is either still running (poll returns 316 # None) or that it completed successfully since when we 317 # launched it above (poll returns the return code, 0). 318 poll_result = self.popen.poll() 319 if poll_result is None or poll_result == 0: 320 self.pid = self.popen.pid 321 printlog_bold("process[%s]: started with pid [%s]"%(self.name, self.pid)) 322 return True 323 else: 324 printerrlog("failed to start local process: %s"%(' '.join(self.args))) 325 return False 326 finally: 327 self.lock.release()
328
329 - def _log_name(self):
330 return self.name.replace('/', '-')
331
332 - def is_alive(self):
333 """ 334 @return: True if process is still running 335 @rtype: bool 336 """ 337 if not self.started: #not started yet 338 return True 339 if self.stopped or self.popen is None: 340 if self.time_of_death is None: 341 self.time_of_death = time.time() 342 return False 343 self.exit_code = self.popen.poll() 344 if self.exit_code is not None: 345 if self.time_of_death is None: 346 self.time_of_death = time.time() 347 return False 348 return True
349
350 - def get_exit_description(self):
351 """ 352 @return: human-readable description of exit state 353 @rtype: str 354 """ 355 if self.exit_code is None: 356 output = 'process has died without exit code [pid %s, cmd %s].'%(self.pid, ' '.join(self.args)) 357 elif self.exit_code != 0: 358 output = 'process has died [pid %s, exit code %s, cmd %s].'%(self.pid, self.exit_code, ' '.join(self.args)) 359 else: 360 output = 'process has finished cleanly' 361 362 if self.log_dir: 363 # #973: include location of output location in message 364 output += '\nlog file: %s*.log'%(os.path.join(self.log_dir, self._log_name())) 365 return output
366
367 - def _stop_unix(self, errors):
368 """ 369 UNIX implementation of process killing 370 371 @param errors: error messages. stop() will record messages into this list. 372 @type errors: [str] 373 """ 374 self.exit_code = self.popen.poll() 375 if self.exit_code is not None: 376 _logger.debug("process[%s].stop(): process has already returned %s", self.name, self.exit_code) 377 #print "process[%s].stop(): process has already returned %s"%(self.name, self.exit_code) 378 self.popen = None 379 self.stopped = True 380 return 381 382 pid = self.popen.pid 383 pgid = os.getpgid(pid) 384 _logger.info("process[%s]: killing os process with pid[%s] pgid[%s]", self.name, pid, pgid) 385 386 try: 387 # Start with SIGINT and escalate from there. 388 _logger.info("[%s] sending SIGINT to pgid [%s]", self.name, pgid) 389 os.killpg(pgid, signal.SIGINT) 390 _logger.info("[%s] sent SIGINT to pgid [%s]", self.name, pgid) 391 timeout_t = time.time() + _TIMEOUT_SIGINT 392 retcode = self.popen.poll() 393 while time.time() < timeout_t and retcode is None: 394 time.sleep(0.1) 395 retcode = self.popen.poll() 396 # Escalate non-responsive process 397 if retcode is None: 398 printerrlog("[%s] escalating to SIGTERM"%self.name) 399 timeout_t = time.time() + _TIMEOUT_SIGTERM 400 os.killpg(pgid, signal.SIGTERM) 401 _logger.info("[%s] sent SIGTERM to pgid [%s]"%(self.name, pgid)) 402 retcode = self.popen.poll() 403 while time.time() < timeout_t and retcode is None: 404 time.sleep(0.2) 405 _logger.debug('poll for retcode') 406 retcode = self.popen.poll() 407 if retcode is None: 408 printerrlog("[%s] escalating to SIGKILL"%self.name) 409 errors.append("process[%s, pid %s]: required SIGKILL. May still be running."%(self.name, pid)) 410 try: 411 os.killpg(pgid, signal.SIGKILL) 412 _logger.info("[%s] sent SIGKILL to pgid [%s]"%(self.name, pgid)) 413 # #2096: don't block on SIGKILL, because this results in more orphaned processes overall 414 #self.popen.wait() 415 #os.wait() 416 _logger.info("process[%s]: sent SIGKILL", self.name) 417 except OSError as e: 418 if e.args[0] == 3: 419 printerrlog("no [%s] process with pid [%s]"%(self.name, pid)) 420 else: 421 printerrlog("errors shutting down [%s], see log for details"%self.name) 422 _logger.error(traceback.format_exc()) 423 else: 424 _logger.info("process[%s]: SIGTERM killed with return value %s", self.name, retcode) 425 else: 426 _logger.info("process[%s]: SIGINT killed with return value %s", self.name, retcode) 427 428 finally: 429 self.popen = None
430
431 - def _stop_win32(self, errors):
432 """ 433 Win32 implementation of process killing. In part, refer to 434 435 http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/347462 436 437 Note that it doesn't work as completely as _stop_unix as it can't utilise 438 group id's. This means that any program which forks children underneath it 439 won't get caught by this kill mechanism. 440 441 @param errors: error messages. stop() will record messages into this list. 442 @type errors: [str] 443 """ 444 self.exit_code = self.popen.poll() 445 if self.exit_code is not None: 446 _logger.debug("process[%s].stop(): process has already returned %s", self.name, self.exit_code) 447 self.popen = None 448 self.stopped = True 449 return 450 451 pid = self.popen.pid 452 _logger.info("process[%s]: killing os process/subprocesses with pid[%s]", self.name, pid) 453 # windows has no group id's :( 454 try: 455 # Start with SIGINT and escalate from there. 456 _logger.info("[%s] sending SIGINT to pgid [%s]", self.name, pid) 457 os.kill(pid, signal.SIGINT) 458 _logger.info("[%s] sent SIGINT to pgid [%s]", self.name, pid) 459 timeout_t = time.time() + _TIMEOUT_SIGINT 460 retcode = self.popen.poll() 461 while time.time() < timeout_t and retcode is None: 462 time.sleep(0.1) 463 retcode = self.popen.poll() 464 # Escalate non-responsive process 465 if retcode is None: 466 printerrlog("[%s] escalating to SIGTERM"%self.name) 467 timeout_t = time.time() + _TIMEOUT_SIGTERM 468 os.killpg(pid, signal.SIGTERM) 469 _logger.info("[%s] sent SIGTERM to pid [%s]"%(self.name, pid)) 470 retcode = self.popen.poll() 471 while time.time() < timeout_t and retcode is None: 472 time.sleep(0.2) 473 _logger.debug('poll for retcode') 474 retcode = self.popen.poll() 475 if retcode is None: 476 printerrlog("[%s] escalating to SIGKILL"%self.name) 477 errors.append("process[%s, pid %s]: required SIGKILL. May still be running."%(self.name, pid)) 478 try: 479 os.killpg(pid, signal.SIGKILL) 480 _logger.info("[%s] sent SIGKILL to pid [%s]"%(self.name, pid)) 481 # #2096: don't block on SIGKILL, because this results in more orphaned processes overall 482 #self.popen.wait() 483 #os.wait() 484 _logger.info("process[%s]: sent SIGKILL", self.name) 485 except OSError as e: 486 if e.args[0] == 3: 487 printerrlog("no [%s] process with pid [%s]"%(self.name, pid)) 488 else: 489 printerrlog("errors shutting down [%s], see log for details"%self.name) 490 _logger.error(traceback.format_exc()) 491 else: 492 _logger.info("process[%s]: SIGTERM killed with return value %s", self.name, retcode) 493 else: 494 _logger.info("process[%s]: SIGINT killed with return value %s", self.name, retcode) 495 finally: 496 self.popen = None
497
498 - def stop(self, errors=None):
499 """ 500 Stop the process. Record any significant error messages in the errors parameter 501 502 @param errors: error messages. stop() will record messages into this list. 503 @type errors: [str] 504 """ 505 if errors is None: 506 errors = [] 507 super(LocalProcess, self).stop(errors) 508 self.lock.acquire() 509 try: 510 try: 511 _logger.debug("process[%s].stop() starting", self.name) 512 if self.popen is None: 513 _logger.debug("process[%s].stop(): popen is None, nothing to kill") 514 return 515 if sys.platform in ['win32']: # cygwin seems to be ok 516 self._stop_win32(errors) 517 else: 518 self._stop_unix(errors) 519 except: 520 #traceback.print_exc() 521 _logger.error("[%s] EXCEPTION %s", self.name, traceback.format_exc()) 522 finally: 523 self.stopped = True 524 self.lock.release()
525 526 527 # #1595
528 -def _cleanup_remappings(args, prefix):
529 """ 530 Remove all instances of args that start with prefix. This is used 531 to remove args that were previously added (and are now being 532 regenerated due to respawning) 533 """ 534 existing_args = [a for a in args if a.startswith(prefix)] 535 for a in existing_args: 536 args.remove(a) 537 return args
538