Package roslaunch :: Module nodeprocess
[frames] | no frames]

Source Code for Module roslaunch.nodeprocess

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2008, Willow Garage, Inc. 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32  # 
 33  # Revision $Id$ 
 34   
 35  """ 
 36  Local process implementation for running and monitoring nodes. 
 37  """ 
 38   
 39  import errno 
 40  import os 
 41  import signal 
 42  import subprocess  
 43  import time 
 44  import traceback 
 45   
 46  import rospkg 
 47   
 48  from roslaunch.core import * 
 49  #from roslaunch.core import setup_env 
 50  from roslaunch.node_args import create_local_process_args 
 51  from roslaunch.pmon import Process, FatalProcessLaunch 
 52   
 53  from rosmaster.master_api import NUM_WORKERS 
 54   
 55  import logging 
 56  _logger = logging.getLogger("roslaunch") 
 57   
 58  DEFAULT_TIMEOUT_SIGINT  = 15.0 #seconds 
 59  DEFAULT_TIMEOUT_SIGTERM = 2.0 #seconds 
 60   
 61  _counter = 0 
62 -def _next_counter():
63 global _counter 64 _counter += 1 65 return _counter
66
67 -def create_master_process(run_id, type_, ros_root, port, num_workers=NUM_WORKERS, timeout=None, master_logger_level=False, sigint_timeout=DEFAULT_TIMEOUT_SIGINT, sigterm_timeout=DEFAULT_TIMEOUT_SIGTERM):
68 """ 69 Launch a master 70 @param type_: name of master executable (currently just Master.ZENMASTER) 71 @type type_: str 72 @param ros_root: ROS_ROOT environment setting 73 @type ros_root: str 74 @param port: port to launch master on 75 @type port: int 76 @param num_workers: number of worker threads. 77 @type num_workers: int 78 @param timeout: socket timeout for connections. 79 @type timeout: float 80 @param master_logger_level: rosmaster.master logger debug level 81 @type master_logger_level=: str or False 82 @param sigint_timeout: The SIGINT timeout used when killing nodes (in seconds). 83 @type sigint_timeout: float 84 @param sigterm_timeout: The SIGTERM timeout used when killing nodes if SIGINT does not stop the node (in seconds). 85 @type sigterm_timeout: float 86 @raise RLException: if type_ or port is invalid or sigint_timeout or sigterm_timeout are nonpositive. 87 """ 88 if port < 1 or port > 65535: 89 raise RLException("invalid port assignment: %s"%port) 90 91 _logger.info("create_master_process: %s, %s, %s, %s, %s, %s", type_, ros_root, port, num_workers, timeout, master_logger_level) 92 # catkin/fuerte: no longer use ROS_ROOT-relative executables, search path instead 93 master = type_ 94 # zenmaster is deprecated and aliased to rosmaster 95 if type_ in [Master.ROSMASTER, Master.ZENMASTER]: 96 package = 'rosmaster' 97 args = [master, '--core', '-p', str(port), '-w', str(num_workers)] 98 if timeout is not None: 99 args += ['-t', str(timeout)] 100 if master_logger_level: 101 args += ['--master-logger-level', str(master_logger_level)] 102 else: 103 raise RLException("unknown master typ_: %s"%type_) 104 105 _logger.info("process[master]: launching with args [%s]"%args) 106 log_output = False 107 return LocalProcess(run_id, package, 'master', args, os.environ, log_output, None, required=True, 108 sigint_timeout=sigint_timeout, sigterm_timeout=sigterm_timeout)
109
110 -def create_node_process(run_id, node, master_uri, sigint_timeout=DEFAULT_TIMEOUT_SIGINT, sigterm_timeout=DEFAULT_TIMEOUT_SIGTERM):
111 """ 112 Factory for generating processes for launching local ROS 113 nodes. Also registers the process with the L{ProcessMonitor} so that 114 events can be generated when the process dies. 115 116 @param run_id: run_id of launch 117 @type run_id: str 118 @param node: node to launch. Node name must be assigned. 119 @type node: L{Node} 120 @param master_uri: API URI for master node 121 @type master_uri: str 122 @param sigint_timeout: The SIGINT timeout used when killing nodes (in seconds). 123 @type sigint_timeout: float 124 @param sigterm_timeout: The SIGTERM timeout used when killing nodes if SIGINT does not stop the node (in seconds). 125 @type sigterm_timeout: float 126 @return: local process instance 127 @rtype: L{LocalProcess} 128 @raise NodeParamsException: If the node's parameters are improperly specific 129 @raise RLException: If sigint_timeout or sigterm_timeout are nonpositive. 130 """ 131 _logger.info("create_node_process: package[%s] type[%s] machine[%s] master_uri[%s]", node.package, node.type, node.machine, master_uri) 132 # check input args 133 machine = node.machine 134 if machine is None: 135 raise RLException("Internal error: no machine selected for node of type [%s/%s]"%(node.package, node.type)) 136 if not node.name: 137 raise ValueError("node name must be assigned") 138 139 # - setup env for process (vars must be strings for os.environ) 140 env = setup_env(node, machine, master_uri) 141 142 if not node.name: 143 raise ValueError("node name must be assigned") 144 145 # we have to include the counter to prevent potential name 146 # collisions between the two branches 147 148 name = "%s-%s"%(rosgraph.names.ns_join(node.namespace, node.name), _next_counter()) 149 if name[0] == '/': 150 name = name[1:] 151 152 _logger.info('process[%s]: env[%s]', name, env) 153 154 args = create_local_process_args(node, machine) 155 156 _logger.info('process[%s]: args[%s]', name, args) 157 158 # default for node.output not set is 'log' 159 log_output = node.output != 'screen' 160 _logger.debug('process[%s]: returning LocalProcess wrapper') 161 return LocalProcess(run_id, node.package, name, args, env, log_output, \ 162 respawn=node.respawn, respawn_delay=node.respawn_delay, \ 163 required=node.required, cwd=node.cwd, \ 164 sigint_timeout=sigint_timeout, sigterm_timeout=sigterm_timeout)
165 166
167 -class LocalProcess(Process):
168 """ 169 Process launched on local machine 170 """ 171
172 - def __init__(self, run_id, package, name, args, env, log_output, 173 respawn=False, respawn_delay=0.0, required=False, cwd=None, 174 is_node=True, sigint_timeout=DEFAULT_TIMEOUT_SIGINT, sigterm_timeout=DEFAULT_TIMEOUT_SIGTERM):
175 """ 176 @param run_id: unique run ID for this roslaunch. Used to 177 generate log directory location. run_id may be None if this 178 feature is not being used. 179 @type run_id: str 180 @param package: name of package process is part of 181 @type package: str 182 @param name: name of process 183 @type name: str 184 @param args: list of arguments to process 185 @type args: [str] 186 @param env: environment dictionary for process 187 @type env: {str : str} 188 @param log_output: if True, log output streams of process 189 @type log_output: bool 190 @param respawn: respawn process if it dies (default is False) 191 @type respawn: bool 192 @param respawn_delay: respawn process after a delay 193 @type respawn_delay: float 194 @param cwd: working directory of process, or None 195 @type cwd: str 196 @param is_node: (optional) if True, process is ROS node and accepts ROS node command-line arguments. Default: True 197 @type is_node: False 198 @param sigint_timeout: The SIGINT timeout used when killing nodes (in seconds). 199 @type sigint_timeout: float 200 @param sigterm_timeout: The SIGTERM timeout used when killing nodes if SIGINT does not stop the node (in seconds). 201 @type sigterm_timeout: float 202 @raise RLException: If sigint_timeout or sigterm_timeout are nonpositive. 203 """ 204 super(LocalProcess, self).__init__(package, name, args, env, 205 respawn, respawn_delay, required) 206 207 if sigint_timeout <= 0: 208 raise RLException("sigint_timeout must be a positive number, received %f" % sigint_timeout) 209 if sigterm_timeout <= 0: 210 raise RLException("sigterm_timeout must be a positive number, received %f" % sigterm_timeout) 211 212 self.run_id = run_id 213 self.popen = None 214 self.log_output = log_output 215 self.started = False 216 self.stopped = False 217 self.cwd = cwd 218 self.log_dir = None 219 self.pid = -1 220 self.is_node = is_node 221 self.sigint_timeout = sigint_timeout 222 self.sigterm_timeout = sigterm_timeout
223 224 # NOTE: in the future, info() is going to have to be sufficient for relaunching a process
225 - def get_info(self):
226 """ 227 Get all data about this process in dictionary form 228 """ 229 info = super(LocalProcess, self).get_info() 230 info['pid'] = self.pid 231 if self.run_id: 232 info['run_id'] = self.run_id 233 info['log_output'] = self.log_output 234 if self.cwd is not None: 235 info['cwd'] = self.cwd 236 return info
237
238 - def _configure_logging(self):
239 """ 240 Configure logging of node's log file and stdout/stderr 241 @return: stdout log file name, stderr log file 242 name. Values are None if stdout/stderr are not logged. 243 @rtype: str, str 244 """ 245 log_dir = rospkg.get_log_dir(env=os.environ) 246 if self.run_id: 247 log_dir = os.path.join(log_dir, self.run_id) 248 if not os.path.exists(log_dir): 249 try: 250 os.makedirs(log_dir) 251 except OSError as e: 252 if e.errno == errno.EACCES: 253 raise RLException("unable to create directory for log file [%s].\nPlease check permissions."%log_dir) 254 else: 255 raise RLException("unable to create directory for log file [%s]: %s"%(log_dir, e.strerror)) 256 # #973: save log dir for error messages 257 self.log_dir = log_dir 258 259 # send stdout/stderr to file. in the case of respawning, we have to 260 # open in append mode 261 # note: logfileerr: disabling in favor of stderr appearing in the console. 262 # will likely reinstate once roserr/rosout is more properly used. 263 logfileout = logfileerr = None 264 logfname = self._log_name() 265 266 if self.log_output: 267 outf, errf = [os.path.join(log_dir, '%s-%s.log'%(logfname, n)) for n in ['stdout', 'stderr']] 268 if self.respawn: 269 mode = 'a' 270 else: 271 mode = 'w' 272 logfileout = open(outf, mode) 273 if is_child_mode(): 274 logfileerr = open(errf, mode) 275 276 # #986: pass in logfile name to node 277 node_log_file = log_dir 278 if self.is_node: 279 # #1595: on respawn, these keep appending 280 self.args = _cleanup_remappings(self.args, '__log:=') 281 self.args.append("__log:=%s"%os.path.join(log_dir, "%s.log"%(logfname))) 282 283 return logfileout, logfileerr
284
285 - def start(self):
286 """ 287 Start the process. 288 289 @raise FatalProcessLaunch: if process cannot be started and it 290 is not likely to ever succeed 291 """ 292 super(LocalProcess, self).start() 293 try: 294 self.lock.acquire() 295 if self.started: 296 _logger.info("process[%s]: restarting os process", self.name) 297 else: 298 _logger.info("process[%s]: starting os process", self.name) 299 self.started = self.stopped = False 300 301 full_env = self.env 302 303 # _configure_logging() can mutate self.args 304 try: 305 logfileout, logfileerr = self._configure_logging() 306 except Exception as e: 307 _logger.error(traceback.format_exc()) 308 printerrlog("[%s] ERROR: unable to configure logging [%s]"%(self.name, str(e))) 309 # it's not safe to inherit from this process as 310 # rostest changes stdout to a StringIO, which is not a 311 # proper file. 312 logfileout, logfileerr = subprocess.PIPE, subprocess.PIPE 313 314 if self.cwd == 'node': 315 cwd = os.path.dirname(self.args[0]) 316 elif self.cwd == 'cwd': 317 cwd = os.getcwd() 318 elif self.cwd == 'ros-root': 319 cwd = get_ros_root() 320 else: 321 cwd = rospkg.get_ros_home() 322 if not os.path.exists(cwd): 323 try: 324 os.makedirs(cwd) 325 except OSError: 326 # exist_ok=True 327 pass 328 329 _logger.info("process[%s]: start w/ args [%s]", self.name, self.args) 330 _logger.info("process[%s]: cwd will be [%s]", self.name, cwd) 331 332 try: 333 preexec_function = os.setsid 334 close_file_descriptor = True 335 except AttributeError: 336 preexec_function = None 337 close_file_descriptor = False 338 339 try: 340 self.popen = subprocess.Popen(self.args, cwd=cwd, stdout=logfileout, stderr=logfileerr, env=full_env, close_fds=close_file_descriptor, preexec_fn=preexec_function) 341 except OSError as e: 342 self.started = True # must set so is_alive state is correct 343 _logger.error("OSError(%d, %s)", e.errno, e.strerror) 344 if e.errno == errno.ENOEXEC: #Exec format error 345 raise FatalProcessLaunch("Unable to launch [%s]. \nIf it is a script, you may be missing a '#!' declaration at the top."%self.name) 346 elif e.errno == errno.ENOENT: #no such file or directory 347 raise FatalProcessLaunch("""Roslaunch got a '%s' error while attempting to run: 348 349 %s 350 351 Please make sure that all the executables in this command exist and have 352 executable permission. This is often caused by a bad launch-prefix."""%(e.strerror, ' '.join(self.args))) 353 else: 354 raise FatalProcessLaunch("unable to launch [%s]: %s"%(' '.join(self.args), e.strerror)) 355 356 self.started = True 357 # Check that the process is either still running (poll returns 358 # None) or that it completed successfully since when we 359 # launched it above (poll returns the return code, 0). 360 poll_result = self.popen.poll() 361 if poll_result is None or poll_result == 0: 362 self.pid = self.popen.pid 363 printlog_bold("process[%s]: started with pid [%s]"%(self.name, self.pid)) 364 return True 365 else: 366 printerrlog("failed to start local process: %s"%(' '.join(self.args))) 367 return False 368 finally: 369 self.lock.release()
370
371 - def _log_name(self):
372 return self.name.replace('/', '-')
373
374 - def is_alive(self):
375 """ 376 @return: True if process is still running 377 @rtype: bool 378 """ 379 if not self.started: #not started yet 380 return True 381 if self.stopped or self.popen is None: 382 if self.time_of_death is None: 383 self.time_of_death = time.time() 384 return False 385 self.exit_code = self.popen.poll() 386 if self.exit_code is not None: 387 if self.time_of_death is None: 388 self.time_of_death = time.time() 389 return False 390 return True
391
392 - def get_exit_description(self):
393 """ 394 @return: human-readable description of exit state 395 @rtype: str 396 """ 397 if self.exit_code is None: 398 output = 'process has died without exit code [pid %s, cmd %s].'%(self.pid, ' '.join(self.args)) 399 elif self.exit_code != 0: 400 output = 'process has died [pid %s, exit code %s, cmd %s].'%(self.pid, self.exit_code, ' '.join(self.args)) 401 else: 402 output = 'process has finished cleanly' 403 404 if self.log_dir: 405 # #973: include location of output location in message 406 output += '\nlog file: %s*.log'%(os.path.join(self.log_dir, self._log_name())) 407 return output
408
409 - def _stop_unix(self, errors):
410 """ 411 UNIX implementation of process killing 412 413 @param errors: error messages. stop() will record messages into this list. 414 @type errors: [str] 415 """ 416 self.exit_code = self.popen.poll() 417 if self.exit_code is not None: 418 _logger.debug("process[%s].stop(): process has already returned %s", self.name, self.exit_code) 419 #print "process[%s].stop(): process has already returned %s"%(self.name, self.exit_code) 420 self.popen = None 421 self.stopped = True 422 return 423 424 pid = self.popen.pid 425 pgid = os.getpgid(pid) 426 _logger.info("process[%s]: killing os process with pid[%s] pgid[%s]", self.name, pid, pgid) 427 428 try: 429 # Start with SIGINT and escalate from there. 430 _logger.info("[%s] sending SIGINT to pgid [%s]", self.name, pgid) 431 os.killpg(pgid, signal.SIGINT) 432 _logger.info("[%s] sent SIGINT to pgid [%s]", self.name, pgid) 433 timeout_t = time.time() + self.sigint_timeout 434 retcode = self.popen.poll() 435 while time.time() < timeout_t and retcode is None: 436 time.sleep(0.1) 437 retcode = self.popen.poll() 438 # Escalate non-responsive process 439 if retcode is None: 440 printerrlog("[%s] escalating to SIGTERM"%self.name) 441 timeout_t = time.time() + self.sigterm_timeout 442 os.killpg(pgid, signal.SIGTERM) 443 _logger.info("[%s] sent SIGTERM to pgid [%s]"%(self.name, pgid)) 444 retcode = self.popen.poll() 445 while time.time() < timeout_t and retcode is None: 446 time.sleep(0.2) 447 _logger.debug('poll for retcode') 448 retcode = self.popen.poll() 449 if retcode is None: 450 printerrlog("[%s] escalating to SIGKILL"%self.name) 451 errors.append("process[%s, pid %s]: required SIGKILL. May still be running."%(self.name, pid)) 452 try: 453 os.killpg(pgid, signal.SIGKILL) 454 _logger.info("[%s] sent SIGKILL to pgid [%s]"%(self.name, pgid)) 455 # #2096: don't block on SIGKILL, because this results in more orphaned processes overall 456 #self.popen.wait() 457 #os.wait() 458 _logger.info("process[%s]: sent SIGKILL", self.name) 459 except OSError as e: 460 if e.args[0] == 3: 461 printerrlog("no [%s] process with pid [%s]"%(self.name, pid)) 462 else: 463 printerrlog("errors shutting down [%s], see log for details"%self.name) 464 _logger.error(traceback.format_exc()) 465 else: 466 _logger.info("process[%s]: SIGTERM killed with return value %s", self.name, retcode) 467 else: 468 _logger.info("process[%s]: SIGINT killed with return value %s", self.name, retcode) 469 470 finally: 471 self.popen = None
472
473 - def _stop_win32(self, errors):
474 """ 475 Win32 implementation of process killing. In part, refer to 476 477 http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/347462 478 479 Note that it doesn't work as completely as _stop_unix as it can't utilise 480 group id's. This means that any program which forks children underneath it 481 won't get caught by this kill mechanism. 482 483 @param errors: error messages. stop() will record messages into this list. 484 @type errors: [str] 485 """ 486 self.exit_code = self.popen.poll() 487 if self.exit_code is not None: 488 _logger.debug("process[%s].stop(): process has already returned %s", self.name, self.exit_code) 489 self.popen = None 490 self.stopped = True 491 return 492 493 pid = self.popen.pid 494 _logger.info("process[%s]: killing os process/subprocesses with pid[%s]", self.name, pid) 495 # windows has no group id's :( 496 try: 497 # Start with SIGINT and escalate from there. 498 _logger.info("[%s] running taskkill pid tree [%s]", self.name, pid) 499 subprocess.call(['taskkill', '/F', '/T', '/PID', str(pid)]) 500 _logger.info("[%s] run taskkill pid tree [%s]", self.name, pid) 501 timeout_t = time.time() + self.sigint_timeout 502 retcode = self.popen.poll() 503 while time.time() < timeout_t and retcode is None: 504 time.sleep(0.1) 505 retcode = self.popen.poll() 506 if retcode is None: 507 printerrlog("errors shutting down [%s], see log for details"%self.name) 508 _logger.error("errors shutting down [%s], see log for details"%self.name) 509 else: 510 _logger.info("process[%s]: SIGINT killed with return value %s", self.name, retcode) 511 finally: 512 self.popen = None
513
514 - def stop(self, errors=None):
515 """ 516 Stop the process. Record any significant error messages in the errors parameter 517 518 @param errors: error messages. stop() will record messages into this list. 519 @type errors: [str] 520 """ 521 if errors is None: 522 errors = [] 523 super(LocalProcess, self).stop(errors) 524 self.lock.acquire() 525 try: 526 try: 527 _logger.debug("process[%s].stop() starting", self.name) 528 if self.popen is None: 529 _logger.debug("process[%s].stop(): popen is None, nothing to kill") 530 return 531 if sys.platform in ['win32']: # cygwin seems to be ok 532 self._stop_win32(errors) 533 else: 534 self._stop_unix(errors) 535 except: 536 #traceback.print_exc() 537 _logger.error("[%s] EXCEPTION %s", self.name, traceback.format_exc()) 538 finally: 539 self.stopped = True 540 self.lock.release()
541 542 543 # #1595
544 -def _cleanup_remappings(args, prefix):
545 """ 546 Remove all instances of args that start with prefix. This is used 547 to remove args that were previously added (and are now being 548 regenerated due to respawning) 549 """ 550 existing_args = [a for a in args if a.startswith(prefix)] 551 for a in existing_args: 552 args.remove(a) 553 return args
554