process.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 #
00003 # Copyright 2011 Facebook
00004 #
00005 # Licensed under the Apache License, Version 2.0 (the "License"); you may
00006 # not use this file except in compliance with the License. You may obtain
00007 # a copy of the License at
00008 #
00009 #     http://www.apache.org/licenses/LICENSE-2.0
00010 #
00011 # Unless required by applicable law or agreed to in writing, software
00012 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
00013 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
00014 # License for the specific language governing permissions and limitations
00015 # under the License.
00016 
00017 """Utilities for working with multiple processes, including both forking
00018 the server into multiple processes and managing subprocesses.
00019 """
00020 
00021 from __future__ import absolute_import, division, print_function, with_statement
00022 
00023 import errno
00024 import os
00025 import signal
00026 import subprocess
00027 import sys
00028 import time
00029 
00030 from binascii import hexlify
00031 
00032 from tornado import ioloop
00033 from tornado.iostream import PipeIOStream
00034 from tornado.log import gen_log
00035 from tornado.platform.auto import set_close_exec
00036 from tornado import stack_context
00037 from tornado.util import errno_from_exception
00038 
00039 try:
00040     import multiprocessing
00041 except ImportError:
00042     # Multiprocessing is not availble on Google App Engine.
00043     multiprocessing = None
00044 
00045 try:
00046     long  # py2
00047 except NameError:
00048     long = int  # py3
00049 
00050 
00051 def cpu_count():
00052     """Returns the number of processors on this machine."""
00053     if multiprocessing is None:
00054         return 1
00055     try:
00056         return multiprocessing.cpu_count()
00057     except NotImplementedError:
00058         pass
00059     try:
00060         return os.sysconf("SC_NPROCESSORS_CONF")
00061     except ValueError:
00062         pass
00063     gen_log.error("Could not detect number of processors; assuming 1")
00064     return 1
00065 
00066 
00067 def _reseed_random():
00068     if 'random' not in sys.modules:
00069         return
00070     import random
00071     # If os.urandom is available, this method does the same thing as
00072     # random.seed (at least as of python 2.6).  If os.urandom is not
00073     # available, we mix in the pid in addition to a timestamp.
00074     try:
00075         seed = long(hexlify(os.urandom(16)), 16)
00076     except NotImplementedError:
00077         seed = int(time.time() * 1000) ^ os.getpid()
00078     random.seed(seed)
00079 
00080 
00081 def _pipe_cloexec():
00082     r, w = os.pipe()
00083     set_close_exec(r)
00084     set_close_exec(w)
00085     return r, w
00086 
00087 
00088 _task_id = None
00089 
00090 
00091 def fork_processes(num_processes, max_restarts=100):
00092     """Starts multiple worker processes.
00093 
00094     If ``num_processes`` is None or <= 0, we detect the number of cores
00095     available on this machine and fork that number of child
00096     processes. If ``num_processes`` is given and > 0, we fork that
00097     specific number of sub-processes.
00098 
00099     Since we use processes and not threads, there is no shared memory
00100     between any server code.
00101 
00102     Note that multiple processes are not compatible with the autoreload
00103     module (or the ``autoreload=True`` option to `tornado.web.Application`
00104     which defaults to True when ``debug=True``).
00105     When using multiple processes, no IOLoops can be created or
00106     referenced until after the call to ``fork_processes``.
00107 
00108     In each child process, ``fork_processes`` returns its *task id*, a
00109     number between 0 and ``num_processes``.  Processes that exit
00110     abnormally (due to a signal or non-zero exit status) are restarted
00111     with the same id (up to ``max_restarts`` times).  In the parent
00112     process, ``fork_processes`` returns None if all child processes
00113     have exited normally, but will otherwise only exit by throwing an
00114     exception.
00115     """
00116     global _task_id
00117     assert _task_id is None
00118     if num_processes is None or num_processes <= 0:
00119         num_processes = cpu_count()
00120     if ioloop.IOLoop.initialized():
00121         raise RuntimeError("Cannot run in multiple processes: IOLoop instance "
00122                            "has already been initialized. You cannot call "
00123                            "IOLoop.instance() before calling start_processes()")
00124     gen_log.info("Starting %d processes", num_processes)
00125     children = {}
00126 
00127     def start_child(i):
00128         pid = os.fork()
00129         if pid == 0:
00130             # child process
00131             _reseed_random()
00132             global _task_id
00133             _task_id = i
00134             return i
00135         else:
00136             children[pid] = i
00137             return None
00138     for i in range(num_processes):
00139         id = start_child(i)
00140         if id is not None:
00141             return id
00142     num_restarts = 0
00143     while children:
00144         try:
00145             pid, status = os.wait()
00146         except OSError as e:
00147             if errno_from_exception(e) == errno.EINTR:
00148                 continue
00149             raise
00150         if pid not in children:
00151             continue
00152         id = children.pop(pid)
00153         if os.WIFSIGNALED(status):
00154             gen_log.warning("child %d (pid %d) killed by signal %d, restarting",
00155                             id, pid, os.WTERMSIG(status))
00156         elif os.WEXITSTATUS(status) != 0:
00157             gen_log.warning("child %d (pid %d) exited with status %d, restarting",
00158                             id, pid, os.WEXITSTATUS(status))
00159         else:
00160             gen_log.info("child %d (pid %d) exited normally", id, pid)
00161             continue
00162         num_restarts += 1
00163         if num_restarts > max_restarts:
00164             raise RuntimeError("Too many child restarts, giving up")
00165         new_id = start_child(id)
00166         if new_id is not None:
00167             return new_id
00168     # All child processes exited cleanly, so exit the master process
00169     # instead of just returning to right after the call to
00170     # fork_processes (which will probably just start up another IOLoop
00171     # unless the caller checks the return value).
00172     sys.exit(0)
00173 
00174 
00175 def task_id():
00176     """Returns the current task id, if any.
00177 
00178     Returns None if this process was not created by `fork_processes`.
00179     """
00180     global _task_id
00181     return _task_id
00182 
00183 
00184 class Subprocess(object):
00185     """Wraps ``subprocess.Popen`` with IOStream support.
00186 
00187     The constructor is the same as ``subprocess.Popen`` with the following
00188     additions:
00189 
00190     * ``stdin``, ``stdout``, and ``stderr`` may have the value
00191       ``tornado.process.Subprocess.STREAM``, which will make the corresponding
00192       attribute of the resulting Subprocess a `.PipeIOStream`.
00193     * A new keyword argument ``io_loop`` may be used to pass in an IOLoop.
00194     """
00195     STREAM = object()
00196 
00197     _initialized = False
00198     _waiting = {}
00199 
00200     def __init__(self, *args, **kwargs):
00201         self.io_loop = kwargs.pop('io_loop', None) or ioloop.IOLoop.current()
00202         # All FDs we create should be closed on error; those in to_close
00203         # should be closed in the parent process on success.
00204         pipe_fds = []
00205         to_close = []
00206         if kwargs.get('stdin') is Subprocess.STREAM:
00207             in_r, in_w = _pipe_cloexec()
00208             kwargs['stdin'] = in_r
00209             pipe_fds.extend((in_r, in_w))
00210             to_close.append(in_r)
00211             self.stdin = PipeIOStream(in_w, io_loop=self.io_loop)
00212         if kwargs.get('stdout') is Subprocess.STREAM:
00213             out_r, out_w = _pipe_cloexec()
00214             kwargs['stdout'] = out_w
00215             pipe_fds.extend((out_r, out_w))
00216             to_close.append(out_w)
00217             self.stdout = PipeIOStream(out_r, io_loop=self.io_loop)
00218         if kwargs.get('stderr') is Subprocess.STREAM:
00219             err_r, err_w = _pipe_cloexec()
00220             kwargs['stderr'] = err_w
00221             pipe_fds.extend((err_r, err_w))
00222             to_close.append(err_w)
00223             self.stderr = PipeIOStream(err_r, io_loop=self.io_loop)
00224         try:
00225             self.proc = subprocess.Popen(*args, **kwargs)
00226         except:
00227             for fd in pipe_fds:
00228                 os.close(fd)
00229             raise
00230         for fd in to_close:
00231             os.close(fd)
00232         for attr in ['stdin', 'stdout', 'stderr', 'pid']:
00233             if not hasattr(self, attr):  # don't clobber streams set above
00234                 setattr(self, attr, getattr(self.proc, attr))
00235         self._exit_callback = None
00236         self.returncode = None
00237 
00238     def set_exit_callback(self, callback):
00239         """Runs ``callback`` when this process exits.
00240 
00241         The callback takes one argument, the return code of the process.
00242 
00243         This method uses a ``SIGCHILD`` handler, which is a global setting
00244         and may conflict if you have other libraries trying to handle the
00245         same signal.  If you are using more than one ``IOLoop`` it may
00246         be necessary to call `Subprocess.initialize` first to designate
00247         one ``IOLoop`` to run the signal handlers.
00248 
00249         In many cases a close callback on the stdout or stderr streams
00250         can be used as an alternative to an exit callback if the
00251         signal handler is causing a problem.
00252         """
00253         self._exit_callback = stack_context.wrap(callback)
00254         Subprocess.initialize(self.io_loop)
00255         Subprocess._waiting[self.pid] = self
00256         Subprocess._try_cleanup_process(self.pid)
00257 
00258     @classmethod
00259     def initialize(cls, io_loop=None):
00260         """Initializes the ``SIGCHILD`` handler.
00261 
00262         The signal handler is run on an `.IOLoop` to avoid locking issues.
00263         Note that the `.IOLoop` used for signal handling need not be the
00264         same one used by individual Subprocess objects (as long as the
00265         ``IOLoops`` are each running in separate threads).
00266         """
00267         if cls._initialized:
00268             return
00269         if io_loop is None:
00270             io_loop = ioloop.IOLoop.current()
00271         cls._old_sigchld = signal.signal(
00272             signal.SIGCHLD,
00273             lambda sig, frame: io_loop.add_callback_from_signal(cls._cleanup))
00274         cls._initialized = True
00275 
00276     @classmethod
00277     def uninitialize(cls):
00278         """Removes the ``SIGCHILD`` handler."""
00279         if not cls._initialized:
00280             return
00281         signal.signal(signal.SIGCHLD, cls._old_sigchld)
00282         cls._initialized = False
00283 
00284     @classmethod
00285     def _cleanup(cls):
00286         for pid in list(cls._waiting.keys()):  # make a copy
00287             cls._try_cleanup_process(pid)
00288 
00289     @classmethod
00290     def _try_cleanup_process(cls, pid):
00291         try:
00292             ret_pid, status = os.waitpid(pid, os.WNOHANG)
00293         except OSError as e:
00294             if errno_from_exception(e) == errno.ECHILD:
00295                 return
00296         if ret_pid == 0:
00297             return
00298         assert ret_pid == pid
00299         subproc = cls._waiting.pop(pid)
00300         subproc.io_loop.add_callback_from_signal(
00301             subproc._set_returncode, status)
00302 
00303     def _set_returncode(self, status):
00304         if os.WIFSIGNALED(status):
00305             self.returncode = -os.WTERMSIG(status)
00306         else:
00307             assert os.WIFEXITED(status)
00308             self.returncode = os.WEXITSTATUS(status)
00309         if self._exit_callback:
00310             callback = self._exit_callback
00311             self._exit_callback = None
00312             callback(self.returncode)


rosbridge_server
Author(s): Jonathan Mace
autogenerated on Thu Aug 27 2015 14:50:40