Package roslaunch :: Module nodeprocess
[frames] | no frames]

Source Code for Module roslaunch.nodeprocess

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2008, Willow Garage, Inc. 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32  # 
 33  # Revision $Id$ 
 34   
 35  """ 
 36  Local process implementation for running and monitoring nodes. 
 37  """ 
 38   
 39  import os 
 40  import signal 
 41  import subprocess  
 42  import time 
 43  import traceback 
 44   
 45  import rospkg 
 46   
 47  from roslaunch.core import * 
 48  #from roslaunch.core import setup_env 
 49  from roslaunch.node_args import create_local_process_args 
 50  from roslaunch.pmon import Process, FatalProcessLaunch 
 51   
 52  from rosmaster.master_api import NUM_WORKERS 
 53   
 54  import logging 
 55  _logger = logging.getLogger("roslaunch") 
 56   
 57  _TIMEOUT_SIGINT  = 15.0 #seconds 
 58  _TIMEOUT_SIGTERM = 2.0 #seconds 
 59   
 60  _counter = 0 
61 -def _next_counter():
62 global _counter 63 _counter += 1 64 return _counter
65
66 -def create_master_process(run_id, type_, ros_root, port, num_workers=NUM_WORKERS, timeout=None):
67 """ 68 Launch a master 69 @param type_: name of master executable (currently just Master.ZENMASTER) 70 @type type_: str 71 @param ros_root: ROS_ROOT environment setting 72 @type ros_root: str 73 @param port: port to launch master on 74 @type port: int 75 @param num_workers: number of worker threads. 76 @type num_workers: int 77 @param timeout: socket timeout for connections. 78 @type timeout: float 79 @raise RLException: if type_ or port is invalid 80 """ 81 if port < 1 or port > 65535: 82 raise RLException("invalid port assignment: %s"%port) 83 84 _logger.info("create_master_process: %s, %s, %s, %s, %s", type_, ros_root, port, num_workers, timeout) 85 # catkin/fuerte: no longer use ROS_ROOT-relative executables, search path instead 86 master = type_ 87 # zenmaster is deprecated and aliased to rosmaster 88 if type_ in [Master.ROSMASTER, Master.ZENMASTER]: 89 package = 'rosmaster' 90 args = [master, '--core', '-p', str(port), '-w', str(num_workers)] 91 if timeout is not None: 92 args += ['-t', str(timeout)] 93 else: 94 raise RLException("unknown master typ_: %s"%type_) 95 96 _logger.info("process[master]: launching with args [%s]"%args) 97 log_output = False 98 return LocalProcess(run_id, package, 'master', args, os.environ, log_output, None, required=True)
99
100 -def create_node_process(run_id, node, master_uri):
101 """ 102 Factory for generating processes for launching local ROS 103 nodes. Also registers the process with the L{ProcessMonitor} so that 104 events can be generated when the process dies. 105 106 @param run_id: run_id of launch 107 @type run_id: str 108 @param node: node to launch. Node name must be assigned. 109 @type node: L{Node} 110 @param master_uri: API URI for master node 111 @type master_uri: str 112 @return: local process instance 113 @rtype: L{LocalProcess} 114 @raise NodeParamsException: If the node's parameters are improperly specific 115 """ 116 _logger.info("create_node_process: package[%s] type[%s] machine[%s] master_uri[%s]", node.package, node.type, node.machine, master_uri) 117 # check input args 118 machine = node.machine 119 if machine is None: 120 raise RLException("Internal error: no machine selected for node of type [%s/%s]"%(node.package, node.type)) 121 if not node.name: 122 raise ValueError("node name must be assigned") 123 124 # - setup env for process (vars must be strings for os.environ) 125 env = setup_env(node, machine, master_uri) 126 127 if not node.name: 128 raise ValueError("node name must be assigned") 129 130 # we have to include the counter to prevent potential name 131 # collisions between the two branches 132 133 name = "%s-%s"%(rosgraph.names.ns_join(node.namespace, node.name), _next_counter()) 134 if name[0] == '/': 135 name = name[1:] 136 137 _logger.info('process[%s]: env[%s]', name, env) 138 139 args = create_local_process_args(node, machine) 140 141 _logger.info('process[%s]: args[%s]', name, args) 142 143 # default for node.output not set is 'log' 144 log_output = node.output != 'screen' 145 _logger.debug('process[%s]: returning LocalProcess wrapper') 146 return LocalProcess(run_id, node.package, name, args, env, log_output, \ 147 respawn=node.respawn, respawn_delay=node.respawn_delay, \ 148 required=node.required, cwd=node.cwd)
149 150
151 -class LocalProcess(Process):
152 """ 153 Process launched on local machine 154 """ 155
156 - def __init__(self, run_id, package, name, args, env, log_output, 157 respawn=False, respawn_delay=0.0, required=False, cwd=None, 158 is_node=True):
159 """ 160 @param run_id: unique run ID for this roslaunch. Used to 161 generate log directory location. run_id may be None if this 162 feature is not being used. 163 @type run_id: str 164 @param package: name of package process is part of 165 @type package: str 166 @param name: name of process 167 @type name: str 168 @param args: list of arguments to process 169 @type args: [str] 170 @param env: environment dictionary for process 171 @type env: {str : str} 172 @param log_output: if True, log output streams of process 173 @type log_output: bool 174 @param respawn: respawn process if it dies (default is False) 175 @type respawn: bool 176 @param respawn_delay: respawn process after a delay 177 @type respawn_delay: float 178 @param cwd: working directory of process, or None 179 @type cwd: str 180 @param is_node: (optional) if True, process is ROS node and accepts ROS node command-line arguments. Default: True 181 @type is_node: False 182 """ 183 super(LocalProcess, self).__init__(package, name, args, env, 184 respawn, respawn_delay, required) 185 self.run_id = run_id 186 self.popen = None 187 self.log_output = log_output 188 self.started = False 189 self.stopped = False 190 self.cwd = cwd 191 self.log_dir = None 192 self.pid = -1 193 self.is_node = is_node
194 195 # NOTE: in the future, info() is going to have to be sufficient for relaunching a process
196 - def get_info(self):
197 """ 198 Get all data about this process in dictionary form 199 """ 200 info = super(LocalProcess, self).get_info() 201 info['pid'] = self.pid 202 if self.run_id: 203 info['run_id'] = self.run_id 204 info['log_output'] = self.log_output 205 if self.cwd is not None: 206 info['cwd'] = self.cwd 207 return info
208
209 - def _configure_logging(self):
210 """ 211 Configure logging of node's log file and stdout/stderr 212 @return: stdout log file name, stderr log file 213 name. Values are None if stdout/stderr are not logged. 214 @rtype: str, str 215 """ 216 log_dir = rospkg.get_log_dir(env=os.environ) 217 if self.run_id: 218 log_dir = os.path.join(log_dir, self.run_id) 219 if not os.path.exists(log_dir): 220 try: 221 os.makedirs(log_dir) 222 except OSError as e: 223 if e.errno == 13: 224 raise RLException("unable to create directory for log file [%s].\nPlease check permissions."%log_dir) 225 else: 226 raise RLException("unable to create directory for log file [%s]: %s"%(log_dir, e.strerror)) 227 # #973: save log dir for error messages 228 self.log_dir = log_dir 229 230 # send stdout/stderr to file. in the case of respawning, we have to 231 # open in append mode 232 # note: logfileerr: disabling in favor of stderr appearing in the console. 233 # will likely reinstate once roserr/rosout is more properly used. 234 logfileout = logfileerr = None 235 logfname = self._log_name() 236 237 if self.log_output: 238 outf, errf = [os.path.join(log_dir, '%s-%s.log'%(logfname, n)) for n in ['stdout', 'stderr']] 239 if self.respawn: 240 mode = 'a' 241 else: 242 mode = 'w' 243 logfileout = open(outf, mode) 244 if is_child_mode(): 245 logfileerr = open(errf, mode) 246 247 # #986: pass in logfile name to node 248 node_log_file = log_dir 249 if self.is_node: 250 # #1595: on respawn, these keep appending 251 self.args = _cleanup_remappings(self.args, '__log:=') 252 self.args.append("__log:=%s"%os.path.join(log_dir, "%s.log"%(logfname))) 253 254 return logfileout, logfileerr
255
256 - def start(self):
257 """ 258 Start the process. 259 260 @raise FatalProcessLaunch: if process cannot be started and it 261 is not likely to ever succeed 262 """ 263 super(LocalProcess, self).start() 264 try: 265 self.lock.acquire() 266 if self.started: 267 _logger.info("process[%s]: restarting os process", self.name) 268 else: 269 _logger.info("process[%s]: starting os process", self.name) 270 self.started = self.stopped = False 271 272 full_env = self.env 273 274 # _configure_logging() can mutate self.args 275 try: 276 logfileout, logfileerr = self._configure_logging() 277 except Exception as e: 278 _logger.error(traceback.format_exc()) 279 printerrlog("[%s] ERROR: unable to configure logging [%s]"%(self.name, str(e))) 280 # it's not safe to inherit from this process as 281 # rostest changes stdout to a StringIO, which is not a 282 # proper file. 283 logfileout, logfileerr = subprocess.PIPE, subprocess.PIPE 284 285 if self.cwd == 'node': 286 cwd = os.path.dirname(self.args[0]) 287 elif self.cwd == 'cwd': 288 cwd = os.getcwd() 289 elif self.cwd == 'ros-root': 290 cwd = get_ros_root() 291 else: 292 cwd = rospkg.get_ros_home() 293 if not os.path.exists(cwd): 294 try: 295 os.makedirs(cwd) 296 except OSError: 297 # exist_ok=True 298 pass 299 300 _logger.info("process[%s]: start w/ args [%s]", self.name, self.args) 301 _logger.info("process[%s]: cwd will be [%s]", self.name, cwd) 302 303 try: 304 self.popen = subprocess.Popen(self.args, cwd=cwd, stdout=logfileout, stderr=logfileerr, env=full_env, close_fds=True, preexec_fn=os.setsid) 305 except OSError as e: 306 self.started = True # must set so is_alive state is correct 307 _logger.error("OSError(%d, %s)", e.errno, e.strerror) 308 if e.errno == 8: #Exec format error 309 raise FatalProcessLaunch("Unable to launch [%s]. \nIf it is a script, you may be missing a '#!' declaration at the top."%self.name) 310 elif e.errno == 2: #no such file or directory 311 raise FatalProcessLaunch("""Roslaunch got a '%s' error while attempting to run: 312 313 %s 314 315 Please make sure that all the executables in this command exist and have 316 executable permission. This is often caused by a bad launch-prefix."""%(e.strerror, ' '.join(self.args))) 317 else: 318 raise FatalProcessLaunch("unable to launch [%s]: %s"%(' '.join(self.args), e.strerror)) 319 320 self.started = True 321 # Check that the process is either still running (poll returns 322 # None) or that it completed successfully since when we 323 # launched it above (poll returns the return code, 0). 324 poll_result = self.popen.poll() 325 if poll_result is None or poll_result == 0: 326 self.pid = self.popen.pid 327 printlog_bold("process[%s]: started with pid [%s]"%(self.name, self.pid)) 328 return True 329 else: 330 printerrlog("failed to start local process: %s"%(' '.join(self.args))) 331 return False 332 finally: 333 self.lock.release()
334
335 - def _log_name(self):
336 return self.name.replace('/', '-')
337
338 - def is_alive(self):
339 """ 340 @return: True if process is still running 341 @rtype: bool 342 """ 343 if not self.started: #not started yet 344 return True 345 if self.stopped or self.popen is None: 346 if self.time_of_death is None: 347 self.time_of_death = time.time() 348 return False 349 self.exit_code = self.popen.poll() 350 if self.exit_code is not None: 351 if self.time_of_death is None: 352 self.time_of_death = time.time() 353 return False 354 return True
355
356 - def get_exit_description(self):
357 """ 358 @return: human-readable description of exit state 359 @rtype: str 360 """ 361 if self.exit_code is None: 362 output = 'process has died without exit code [pid %s, cmd %s].'%(self.pid, ' '.join(self.args)) 363 elif self.exit_code != 0: 364 output = 'process has died [pid %s, exit code %s, cmd %s].'%(self.pid, self.exit_code, ' '.join(self.args)) 365 else: 366 output = 'process has finished cleanly' 367 368 if self.log_dir: 369 # #973: include location of output location in message 370 output += '\nlog file: %s*.log'%(os.path.join(self.log_dir, self._log_name())) 371 return output
372
373 - def _stop_unix(self, errors):
374 """ 375 UNIX implementation of process killing 376 377 @param errors: error messages. stop() will record messages into this list. 378 @type errors: [str] 379 """ 380 self.exit_code = self.popen.poll() 381 if self.exit_code is not None: 382 _logger.debug("process[%s].stop(): process has already returned %s", self.name, self.exit_code) 383 #print "process[%s].stop(): process has already returned %s"%(self.name, self.exit_code) 384 self.popen = None 385 self.stopped = True 386 return 387 388 pid = self.popen.pid 389 pgid = os.getpgid(pid) 390 _logger.info("process[%s]: killing os process with pid[%s] pgid[%s]", self.name, pid, pgid) 391 392 try: 393 # Start with SIGINT and escalate from there. 394 _logger.info("[%s] sending SIGINT to pgid [%s]", self.name, pgid) 395 os.killpg(pgid, signal.SIGINT) 396 _logger.info("[%s] sent SIGINT to pgid [%s]", self.name, pgid) 397 timeout_t = time.time() + _TIMEOUT_SIGINT 398 retcode = self.popen.poll() 399 while time.time() < timeout_t and retcode is None: 400 time.sleep(0.1) 401 retcode = self.popen.poll() 402 # Escalate non-responsive process 403 if retcode is None: 404 printerrlog("[%s] escalating to SIGTERM"%self.name) 405 timeout_t = time.time() + _TIMEOUT_SIGTERM 406 os.killpg(pgid, signal.SIGTERM) 407 _logger.info("[%s] sent SIGTERM to pgid [%s]"%(self.name, pgid)) 408 retcode = self.popen.poll() 409 while time.time() < timeout_t and retcode is None: 410 time.sleep(0.2) 411 _logger.debug('poll for retcode') 412 retcode = self.popen.poll() 413 if retcode is None: 414 printerrlog("[%s] escalating to SIGKILL"%self.name) 415 errors.append("process[%s, pid %s]: required SIGKILL. May still be running."%(self.name, pid)) 416 try: 417 os.killpg(pgid, signal.SIGKILL) 418 _logger.info("[%s] sent SIGKILL to pgid [%s]"%(self.name, pgid)) 419 # #2096: don't block on SIGKILL, because this results in more orphaned processes overall 420 #self.popen.wait() 421 #os.wait() 422 _logger.info("process[%s]: sent SIGKILL", self.name) 423 except OSError as e: 424 if e.args[0] == 3: 425 printerrlog("no [%s] process with pid [%s]"%(self.name, pid)) 426 else: 427 printerrlog("errors shutting down [%s], see log for details"%self.name) 428 _logger.error(traceback.format_exc()) 429 else: 430 _logger.info("process[%s]: SIGTERM killed with return value %s", self.name, retcode) 431 else: 432 _logger.info("process[%s]: SIGINT killed with return value %s", self.name, retcode) 433 434 finally: 435 self.popen = None
436
437 - def _stop_win32(self, errors):
438 """ 439 Win32 implementation of process killing. In part, refer to 440 441 http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/347462 442 443 Note that it doesn't work as completely as _stop_unix as it can't utilise 444 group id's. This means that any program which forks children underneath it 445 won't get caught by this kill mechanism. 446 447 @param errors: error messages. stop() will record messages into this list. 448 @type errors: [str] 449 """ 450 self.exit_code = self.popen.poll() 451 if self.exit_code is not None: 452 _logger.debug("process[%s].stop(): process has already returned %s", self.name, self.exit_code) 453 self.popen = None 454 self.stopped = True 455 return 456 457 pid = self.popen.pid 458 _logger.info("process[%s]: killing os process/subprocesses with pid[%s]", self.name, pid) 459 # windows has no group id's :( 460 try: 461 # Start with SIGINT and escalate from there. 462 _logger.info("[%s] sending SIGINT to pgid [%s]", self.name, pid) 463 os.kill(pid, signal.SIGINT) 464 _logger.info("[%s] sent SIGINT to pgid [%s]", self.name, pid) 465 timeout_t = time.time() + _TIMEOUT_SIGINT 466 retcode = self.popen.poll() 467 while time.time() < timeout_t and retcode is None: 468 time.sleep(0.1) 469 retcode = self.popen.poll() 470 # Escalate non-responsive process 471 if retcode is None: 472 printerrlog("[%s] escalating to SIGTERM"%self.name) 473 timeout_t = time.time() + _TIMEOUT_SIGTERM 474 os.killpg(pid, signal.SIGTERM) 475 _logger.info("[%s] sent SIGTERM to pid [%s]"%(self.name, pid)) 476 retcode = self.popen.poll() 477 while time.time() < timeout_t and retcode is None: 478 time.sleep(0.2) 479 _logger.debug('poll for retcode') 480 retcode = self.popen.poll() 481 if retcode is None: 482 printerrlog("[%s] escalating to SIGKILL"%self.name) 483 errors.append("process[%s, pid %s]: required SIGKILL. May still be running."%(self.name, pid)) 484 try: 485 os.killpg(pid, signal.SIGKILL) 486 _logger.info("[%s] sent SIGKILL to pid [%s]"%(self.name, pid)) 487 # #2096: don't block on SIGKILL, because this results in more orphaned processes overall 488 #self.popen.wait() 489 #os.wait() 490 _logger.info("process[%s]: sent SIGKILL", self.name) 491 except OSError as e: 492 if e.args[0] == 3: 493 printerrlog("no [%s] process with pid [%s]"%(self.name, pid)) 494 else: 495 printerrlog("errors shutting down [%s], see log for details"%self.name) 496 _logger.error(traceback.format_exc()) 497 else: 498 _logger.info("process[%s]: SIGTERM killed with return value %s", self.name, retcode) 499 else: 500 _logger.info("process[%s]: SIGINT killed with return value %s", self.name, retcode) 501 finally: 502 self.popen = None
503
504 - def stop(self, errors=None):
505 """ 506 Stop the process. Record any significant error messages in the errors parameter 507 508 @param errors: error messages. stop() will record messages into this list. 509 @type errors: [str] 510 """ 511 if errors is None: 512 errors = [] 513 super(LocalProcess, self).stop(errors) 514 self.lock.acquire() 515 try: 516 try: 517 _logger.debug("process[%s].stop() starting", self.name) 518 if self.popen is None: 519 _logger.debug("process[%s].stop(): popen is None, nothing to kill") 520 return 521 if sys.platform in ['win32']: # cygwin seems to be ok 522 self._stop_win32(errors) 523 else: 524 self._stop_unix(errors) 525 except: 526 #traceback.print_exc() 527 _logger.error("[%s] EXCEPTION %s", self.name, traceback.format_exc()) 528 finally: 529 self.stopped = True 530 self.lock.release()
531 532 533 # #1595
534 -def _cleanup_remappings(args, prefix):
535 """ 536 Remove all instances of args that start with prefix. This is used 537 to remove args that were previously added (and are now being 538 regenerated due to respawning) 539 """ 540 existing_args = [a for a in args if a.startswith(prefix)] 541 for a in existing_args: 542 args.remove(a) 543 return args
544