Package roslaunch :: Module nodeprocess
[frames] | no frames]

Source Code for Module roslaunch.nodeprocess

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2008, Willow Garage, Inc. 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32  # 
 33  # Revision $Id$ 
 34   
 35  """ 
 36  Local process implementation for running and monitoring nodes. 
 37  """ 
 38   
 39  import errno 
 40  import os 
 41  import signal 
 42  import subprocess  
 43  import time 
 44  import traceback 
 45   
 46  import rospkg 
 47   
 48  from roslaunch.core import * 
 49  #from roslaunch.core import setup_env 
 50  from roslaunch.node_args import create_local_process_args 
 51  from roslaunch.pmon import Process, FatalProcessLaunch 
 52   
 53  from rosmaster.master_api import NUM_WORKERS 
 54   
 55  import logging 
 56  _logger = logging.getLogger("roslaunch") 
 57   
 58  _TIMEOUT_SIGINT  = 15.0 #seconds 
 59  _TIMEOUT_SIGTERM = 2.0 #seconds 
 60   
 61  _counter = 0 
62 -def _next_counter():
63 global _counter 64 _counter += 1 65 return _counter
66
67 -def create_master_process(run_id, type_, ros_root, port, num_workers=NUM_WORKERS, timeout=None, master_logger_level=False):
68 """ 69 Launch a master 70 @param type_: name of master executable (currently just Master.ZENMASTER) 71 @type type_: str 72 @param ros_root: ROS_ROOT environment setting 73 @type ros_root: str 74 @param port: port to launch master on 75 @type port: int 76 @param num_workers: number of worker threads. 77 @type num_workers: int 78 @param timeout: socket timeout for connections. 79 @type timeout: float 80 @raise RLException: if type_ or port is invalid 81 @param master_logger_level: rosmaster.master logger debug level 82 @type master_logger_level=: str or False 83 """ 84 if port < 1 or port > 65535: 85 raise RLException("invalid port assignment: %s"%port) 86 87 _logger.info("create_master_process: %s, %s, %s, %s, %s, %s", type_, ros_root, port, num_workers, timeout, master_logger_level) 88 # catkin/fuerte: no longer use ROS_ROOT-relative executables, search path instead 89 master = type_ 90 # zenmaster is deprecated and aliased to rosmaster 91 if type_ in [Master.ROSMASTER, Master.ZENMASTER]: 92 package = 'rosmaster' 93 args = [master, '--core', '-p', str(port), '-w', str(num_workers)] 94 if timeout is not None: 95 args += ['-t', str(timeout)] 96 if master_logger_level: 97 args += ['--master-logger-level', str(master_logger_level)] 98 else: 99 raise RLException("unknown master typ_: %s"%type_) 100 101 _logger.info("process[master]: launching with args [%s]"%args) 102 log_output = False 103 return LocalProcess(run_id, package, 'master', args, os.environ, log_output, None, required=True)
104
105 -def create_node_process(run_id, node, master_uri):
106 """ 107 Factory for generating processes for launching local ROS 108 nodes. Also registers the process with the L{ProcessMonitor} so that 109 events can be generated when the process dies. 110 111 @param run_id: run_id of launch 112 @type run_id: str 113 @param node: node to launch. Node name must be assigned. 114 @type node: L{Node} 115 @param master_uri: API URI for master node 116 @type master_uri: str 117 @return: local process instance 118 @rtype: L{LocalProcess} 119 @raise NodeParamsException: If the node's parameters are improperly specific 120 """ 121 _logger.info("create_node_process: package[%s] type[%s] machine[%s] master_uri[%s]", node.package, node.type, node.machine, master_uri) 122 # check input args 123 machine = node.machine 124 if machine is None: 125 raise RLException("Internal error: no machine selected for node of type [%s/%s]"%(node.package, node.type)) 126 if not node.name: 127 raise ValueError("node name must be assigned") 128 129 # - setup env for process (vars must be strings for os.environ) 130 env = setup_env(node, machine, master_uri) 131 132 if not node.name: 133 raise ValueError("node name must be assigned") 134 135 # we have to include the counter to prevent potential name 136 # collisions between the two branches 137 138 name = "%s-%s"%(rosgraph.names.ns_join(node.namespace, node.name), _next_counter()) 139 if name[0] == '/': 140 name = name[1:] 141 142 _logger.info('process[%s]: env[%s]', name, env) 143 144 args = create_local_process_args(node, machine) 145 146 _logger.info('process[%s]: args[%s]', name, args) 147 148 # default for node.output not set is 'log' 149 log_output = node.output != 'screen' 150 _logger.debug('process[%s]: returning LocalProcess wrapper') 151 return LocalProcess(run_id, node.package, name, args, env, log_output, \ 152 respawn=node.respawn, respawn_delay=node.respawn_delay, \ 153 required=node.required, cwd=node.cwd)
154 155
156 -class LocalProcess(Process):
157 """ 158 Process launched on local machine 159 """ 160
161 - def __init__(self, run_id, package, name, args, env, log_output, 162 respawn=False, respawn_delay=0.0, required=False, cwd=None, 163 is_node=True):
164 """ 165 @param run_id: unique run ID for this roslaunch. Used to 166 generate log directory location. run_id may be None if this 167 feature is not being used. 168 @type run_id: str 169 @param package: name of package process is part of 170 @type package: str 171 @param name: name of process 172 @type name: str 173 @param args: list of arguments to process 174 @type args: [str] 175 @param env: environment dictionary for process 176 @type env: {str : str} 177 @param log_output: if True, log output streams of process 178 @type log_output: bool 179 @param respawn: respawn process if it dies (default is False) 180 @type respawn: bool 181 @param respawn_delay: respawn process after a delay 182 @type respawn_delay: float 183 @param cwd: working directory of process, or None 184 @type cwd: str 185 @param is_node: (optional) if True, process is ROS node and accepts ROS node command-line arguments. Default: True 186 @type is_node: False 187 """ 188 super(LocalProcess, self).__init__(package, name, args, env, 189 respawn, respawn_delay, required) 190 self.run_id = run_id 191 self.popen = None 192 self.log_output = log_output 193 self.started = False 194 self.stopped = False 195 self.cwd = cwd 196 self.log_dir = None 197 self.pid = -1 198 self.is_node = is_node
199 200 # NOTE: in the future, info() is going to have to be sufficient for relaunching a process
201 - def get_info(self):
202 """ 203 Get all data about this process in dictionary form 204 """ 205 info = super(LocalProcess, self).get_info() 206 info['pid'] = self.pid 207 if self.run_id: 208 info['run_id'] = self.run_id 209 info['log_output'] = self.log_output 210 if self.cwd is not None: 211 info['cwd'] = self.cwd 212 return info
213
214 - def _configure_logging(self):
215 """ 216 Configure logging of node's log file and stdout/stderr 217 @return: stdout log file name, stderr log file 218 name. Values are None if stdout/stderr are not logged. 219 @rtype: str, str 220 """ 221 log_dir = rospkg.get_log_dir(env=os.environ) 222 if self.run_id: 223 log_dir = os.path.join(log_dir, self.run_id) 224 if not os.path.exists(log_dir): 225 try: 226 os.makedirs(log_dir) 227 except OSError as e: 228 if e.errno == errno.EACCES: 229 raise RLException("unable to create directory for log file [%s].\nPlease check permissions."%log_dir) 230 else: 231 raise RLException("unable to create directory for log file [%s]: %s"%(log_dir, e.strerror)) 232 # #973: save log dir for error messages 233 self.log_dir = log_dir 234 235 # send stdout/stderr to file. in the case of respawning, we have to 236 # open in append mode 237 # note: logfileerr: disabling in favor of stderr appearing in the console. 238 # will likely reinstate once roserr/rosout is more properly used. 239 logfileout = logfileerr = None 240 logfname = self._log_name() 241 242 if self.log_output: 243 outf, errf = [os.path.join(log_dir, '%s-%s.log'%(logfname, n)) for n in ['stdout', 'stderr']] 244 if self.respawn: 245 mode = 'a' 246 else: 247 mode = 'w' 248 logfileout = open(outf, mode) 249 if is_child_mode(): 250 logfileerr = open(errf, mode) 251 252 # #986: pass in logfile name to node 253 node_log_file = log_dir 254 if self.is_node: 255 # #1595: on respawn, these keep appending 256 self.args = _cleanup_remappings(self.args, '__log:=') 257 self.args.append("__log:=%s"%os.path.join(log_dir, "%s.log"%(logfname))) 258 259 return logfileout, logfileerr
260
261 - def start(self):
262 """ 263 Start the process. 264 265 @raise FatalProcessLaunch: if process cannot be started and it 266 is not likely to ever succeed 267 """ 268 super(LocalProcess, self).start() 269 try: 270 self.lock.acquire() 271 if self.started: 272 _logger.info("process[%s]: restarting os process", self.name) 273 else: 274 _logger.info("process[%s]: starting os process", self.name) 275 self.started = self.stopped = False 276 277 full_env = self.env 278 279 # _configure_logging() can mutate self.args 280 try: 281 logfileout, logfileerr = self._configure_logging() 282 except Exception as e: 283 _logger.error(traceback.format_exc()) 284 printerrlog("[%s] ERROR: unable to configure logging [%s]"%(self.name, str(e))) 285 # it's not safe to inherit from this process as 286 # rostest changes stdout to a StringIO, which is not a 287 # proper file. 288 logfileout, logfileerr = subprocess.PIPE, subprocess.PIPE 289 290 if self.cwd == 'node': 291 cwd = os.path.dirname(self.args[0]) 292 elif self.cwd == 'cwd': 293 cwd = os.getcwd() 294 elif self.cwd == 'ros-root': 295 cwd = get_ros_root() 296 else: 297 cwd = rospkg.get_ros_home() 298 if not os.path.exists(cwd): 299 try: 300 os.makedirs(cwd) 301 except OSError: 302 # exist_ok=True 303 pass 304 305 _logger.info("process[%s]: start w/ args [%s]", self.name, self.args) 306 _logger.info("process[%s]: cwd will be [%s]", self.name, cwd) 307 308 try: 309 self.popen = subprocess.Popen(self.args, cwd=cwd, stdout=logfileout, stderr=logfileerr, env=full_env, close_fds=True, preexec_fn=os.setsid) 310 except OSError as e: 311 self.started = True # must set so is_alive state is correct 312 _logger.error("OSError(%d, %s)", e.errno, e.strerror) 313 if e.errno == errno.ENOEXEC: #Exec format error 314 raise FatalProcessLaunch("Unable to launch [%s]. \nIf it is a script, you may be missing a '#!' declaration at the top."%self.name) 315 elif e.errno == errno.ENOENT: #no such file or directory 316 raise FatalProcessLaunch("""Roslaunch got a '%s' error while attempting to run: 317 318 %s 319 320 Please make sure that all the executables in this command exist and have 321 executable permission. This is often caused by a bad launch-prefix."""%(e.strerror, ' '.join(self.args))) 322 else: 323 raise FatalProcessLaunch("unable to launch [%s]: %s"%(' '.join(self.args), e.strerror)) 324 325 self.started = True 326 # Check that the process is either still running (poll returns 327 # None) or that it completed successfully since when we 328 # launched it above (poll returns the return code, 0). 329 poll_result = self.popen.poll() 330 if poll_result is None or poll_result == 0: 331 self.pid = self.popen.pid 332 printlog_bold("process[%s]: started with pid [%s]"%(self.name, self.pid)) 333 return True 334 else: 335 printerrlog("failed to start local process: %s"%(' '.join(self.args))) 336 return False 337 finally: 338 self.lock.release()
339
340 - def _log_name(self):
341 return self.name.replace('/', '-')
342
343 - def is_alive(self):
344 """ 345 @return: True if process is still running 346 @rtype: bool 347 """ 348 if not self.started: #not started yet 349 return True 350 if self.stopped or self.popen is None: 351 if self.time_of_death is None: 352 self.time_of_death = time.time() 353 return False 354 self.exit_code = self.popen.poll() 355 if self.exit_code is not None: 356 if self.time_of_death is None: 357 self.time_of_death = time.time() 358 return False 359 return True
360
361 - def get_exit_description(self):
362 """ 363 @return: human-readable description of exit state 364 @rtype: str 365 """ 366 if self.exit_code is None: 367 output = 'process has died without exit code [pid %s, cmd %s].'%(self.pid, ' '.join(self.args)) 368 elif self.exit_code != 0: 369 output = 'process has died [pid %s, exit code %s, cmd %s].'%(self.pid, self.exit_code, ' '.join(self.args)) 370 else: 371 output = 'process has finished cleanly' 372 373 if self.log_dir: 374 # #973: include location of output location in message 375 output += '\nlog file: %s*.log'%(os.path.join(self.log_dir, self._log_name())) 376 return output
377
378 - def _stop_unix(self, errors):
379 """ 380 UNIX implementation of process killing 381 382 @param errors: error messages. stop() will record messages into this list. 383 @type errors: [str] 384 """ 385 self.exit_code = self.popen.poll() 386 if self.exit_code is not None: 387 _logger.debug("process[%s].stop(): process has already returned %s", self.name, self.exit_code) 388 #print "process[%s].stop(): process has already returned %s"%(self.name, self.exit_code) 389 self.popen = None 390 self.stopped = True 391 return 392 393 pid = self.popen.pid 394 pgid = os.getpgid(pid) 395 _logger.info("process[%s]: killing os process with pid[%s] pgid[%s]", self.name, pid, pgid) 396 397 try: 398 # Start with SIGINT and escalate from there. 399 _logger.info("[%s] sending SIGINT to pgid [%s]", self.name, pgid) 400 os.killpg(pgid, signal.SIGINT) 401 _logger.info("[%s] sent SIGINT to pgid [%s]", self.name, pgid) 402 timeout_t = time.time() + _TIMEOUT_SIGINT 403 retcode = self.popen.poll() 404 while time.time() < timeout_t and retcode is None: 405 time.sleep(0.1) 406 retcode = self.popen.poll() 407 # Escalate non-responsive process 408 if retcode is None: 409 printerrlog("[%s] escalating to SIGTERM"%self.name) 410 timeout_t = time.time() + _TIMEOUT_SIGTERM 411 os.killpg(pgid, signal.SIGTERM) 412 _logger.info("[%s] sent SIGTERM to pgid [%s]"%(self.name, pgid)) 413 retcode = self.popen.poll() 414 while time.time() < timeout_t and retcode is None: 415 time.sleep(0.2) 416 _logger.debug('poll for retcode') 417 retcode = self.popen.poll() 418 if retcode is None: 419 printerrlog("[%s] escalating to SIGKILL"%self.name) 420 errors.append("process[%s, pid %s]: required SIGKILL. May still be running."%(self.name, pid)) 421 try: 422 os.killpg(pgid, signal.SIGKILL) 423 _logger.info("[%s] sent SIGKILL to pgid [%s]"%(self.name, pgid)) 424 # #2096: don't block on SIGKILL, because this results in more orphaned processes overall 425 #self.popen.wait() 426 #os.wait() 427 _logger.info("process[%s]: sent SIGKILL", self.name) 428 except OSError as e: 429 if e.args[0] == 3: 430 printerrlog("no [%s] process with pid [%s]"%(self.name, pid)) 431 else: 432 printerrlog("errors shutting down [%s], see log for details"%self.name) 433 _logger.error(traceback.format_exc()) 434 else: 435 _logger.info("process[%s]: SIGTERM killed with return value %s", self.name, retcode) 436 else: 437 _logger.info("process[%s]: SIGINT killed with return value %s", self.name, retcode) 438 439 finally: 440 self.popen = None
441
442 - def _stop_win32(self, errors):
443 """ 444 Win32 implementation of process killing. In part, refer to 445 446 http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/347462 447 448 Note that it doesn't work as completely as _stop_unix as it can't utilise 449 group id's. This means that any program which forks children underneath it 450 won't get caught by this kill mechanism. 451 452 @param errors: error messages. stop() will record messages into this list. 453 @type errors: [str] 454 """ 455 self.exit_code = self.popen.poll() 456 if self.exit_code is not None: 457 _logger.debug("process[%s].stop(): process has already returned %s", self.name, self.exit_code) 458 self.popen = None 459 self.stopped = True 460 return 461 462 pid = self.popen.pid 463 _logger.info("process[%s]: killing os process/subprocesses with pid[%s]", self.name, pid) 464 # windows has no group id's :( 465 try: 466 # Start with SIGINT and escalate from there. 467 _logger.info("[%s] sending SIGINT to pgid [%s]", self.name, pid) 468 os.kill(pid, signal.SIGINT) 469 _logger.info("[%s] sent SIGINT to pgid [%s]", self.name, pid) 470 timeout_t = time.time() + _TIMEOUT_SIGINT 471 retcode = self.popen.poll() 472 while time.time() < timeout_t and retcode is None: 473 time.sleep(0.1) 474 retcode = self.popen.poll() 475 # Escalate non-responsive process 476 if retcode is None: 477 printerrlog("[%s] escalating to SIGTERM"%self.name) 478 timeout_t = time.time() + _TIMEOUT_SIGTERM 479 os.killpg(pid, signal.SIGTERM) 480 _logger.info("[%s] sent SIGTERM to pid [%s]"%(self.name, pid)) 481 retcode = self.popen.poll() 482 while time.time() < timeout_t and retcode is None: 483 time.sleep(0.2) 484 _logger.debug('poll for retcode') 485 retcode = self.popen.poll() 486 if retcode is None: 487 printerrlog("[%s] escalating to SIGKILL"%self.name) 488 errors.append("process[%s, pid %s]: required SIGKILL. May still be running."%(self.name, pid)) 489 try: 490 os.killpg(pid, signal.SIGKILL) 491 _logger.info("[%s] sent SIGKILL to pid [%s]"%(self.name, pid)) 492 # #2096: don't block on SIGKILL, because this results in more orphaned processes overall 493 #self.popen.wait() 494 #os.wait() 495 _logger.info("process[%s]: sent SIGKILL", self.name) 496 except OSError as e: 497 if e.args[0] == 3: 498 printerrlog("no [%s] process with pid [%s]"%(self.name, pid)) 499 else: 500 printerrlog("errors shutting down [%s], see log for details"%self.name) 501 _logger.error(traceback.format_exc()) 502 else: 503 _logger.info("process[%s]: SIGTERM killed with return value %s", self.name, retcode) 504 else: 505 _logger.info("process[%s]: SIGINT killed with return value %s", self.name, retcode) 506 finally: 507 self.popen = None
508
509 - def stop(self, errors=None):
510 """ 511 Stop the process. Record any significant error messages in the errors parameter 512 513 @param errors: error messages. stop() will record messages into this list. 514 @type errors: [str] 515 """ 516 if errors is None: 517 errors = [] 518 super(LocalProcess, self).stop(errors) 519 self.lock.acquire() 520 try: 521 try: 522 _logger.debug("process[%s].stop() starting", self.name) 523 if self.popen is None: 524 _logger.debug("process[%s].stop(): popen is None, nothing to kill") 525 return 526 if sys.platform in ['win32']: # cygwin seems to be ok 527 self._stop_win32(errors) 528 else: 529 self._stop_unix(errors) 530 except: 531 #traceback.print_exc() 532 _logger.error("[%s] EXCEPTION %s", self.name, traceback.format_exc()) 533 finally: 534 self.stopped = True 535 self.lock.release()
536 537 538 # #1595
539 -def _cleanup_remappings(args, prefix):
540 """ 541 Remove all instances of args that start with prefix. This is used 542 to remove args that were previously added (and are now being 543 regenerated due to respawning) 544 """ 545 existing_args = [a for a in args if a.startswith(prefix)] 546 for a in existing_args: 547 args.remove(a) 548 return args
549