Source code for rocon_python_utils.system.popen

#
# License: BSD
#   https://raw.github.com/robotics-in-concert/rocon_tools/license/LICENSE
#
##############################################################################
# Description
##############################################################################

"""
.. module:: system.popen
   :platform: Unix
   :synopsis: Workaround a few of the constrations in the official popen impl.

This module helps workaround the official popen with another implementation
that solves a few problems (post-exec callbacks, etc)

----

"""

##############################################################################
# Imports
##############################################################################

# system
import os
import threading
import subprocess
import signal

##############################################################################
# Subprocess
##############################################################################


[docs]class Popen(object): ''' Use this if you want to attach a postexec function to popen (which is not supported by popen at all). It also defaults setting and terminating whole process groups (something we do quite often in ros). **Usage:** This is what you'd do starting a rosrunnable with a listener for termination. .. code-block:: python def process_listener(): print("subprocess terminating") command_args = ['rosrun', package_name, rosrunnable_filename, '__name:=my_node'] process = rocon_python_utils.system.Popen(command_args, postexec_fn=process_listener) ''' __slots__ = [ 'pid', '_proc', '_thread', '_external_preexec_fn', '_shell', '_env', 'terminate' ]
[docs] def __init__(self, popen_args, shell=False, preexec_fn=None, postexec_fn=None, env=None): ''' :param popen_args: list/tuple of usual popen args :type popen_args: str[] :param bool shell: same as the shell argument passed to subprocess.Popen :param preexec_fn: usual popen pre-exec function :type preexec_fn: method with no args :param postexec_fn: the callback which we support for postexec. :type postexec_fn: method with no args :param dict env: a customised environment to run the process in. ''' self.pid = None self._proc = None self._shell = shell self._env = env self._external_preexec_fn = preexec_fn self._thread = threading.Thread(target=self._run_in_thread, args=(popen_args, self._preexec_fn, postexec_fn)) self._thread.start()
[docs] def send_signal(self, sig): """ Send the process a posix signal. See `man 7 signal` for a list and pass them by keyword (e.g. signal.SIGINT) or directly by integer value. :param int sig: one of the posix signals. """ os.killpg(self._proc.pid, sig) # This would be the normal way if not defaulting settings for process groups #self._proc.send_signal(sig)
def _preexec_fn(self): """ A default preexec function that is usually applicable in terminal launching situations since we need to take care of process groups. See http://stackoverflow.com/questions/3791398/how-to-stop-python-from-propagating-signals-to-subprocesses for some interesting information around this topic, specifically with resolving signal forwarding and use of preexec - there are some differences between 2.x and 3.2+ handling in subprocess. """ os.setpgrp() if self._external_preexec_fn is not None: self._external_preexec_fn()
[docs] def terminate(self): ''' :raises: :exc:`.OSError` if the process has already shut down. ''' return os.killpg(self._proc.pid, signal.SIGTERM) # if we were not setting process groups #return self._proc.terminate() if self._proc is not None else None
def _run_in_thread(self, popen_args, preexec_fn, postexec_fn): ''' Worker function for the thread, creates the subprocess itself. ''' if preexec_fn is not None: if self._shell == True: #print("rocon_python_utils.os.Popen: %s" % " ".join(popen_args)) self._proc = subprocess.Popen(" ".join(popen_args), shell=True, preexec_fn=preexec_fn, env=self._env) else: #print("rocon_python_utils.os..Popen: %s" % popen_args) self._proc = subprocess.Popen(popen_args, shell=self._shell, preexec_fn=preexec_fn, env=self._env) else: self._proc = subprocess.Popen(popen_args, shell=self._shell, env=self._env) self.pid = self._proc.pid self._proc.wait() if postexec_fn is not None: postexec_fn() return