00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 """Utilities for working with multiple processes, including both forking
00018 the server into multiple processes and managing subprocesses.
00019 """
00020
00021 from __future__ import absolute_import, division, print_function, with_statement
00022
00023 import errno
00024 import os
00025 import signal
00026 import subprocess
00027 import sys
00028 import time
00029
00030 from binascii import hexlify
00031
00032 from tornado import ioloop
00033 from tornado.iostream import PipeIOStream
00034 from tornado.log import gen_log
00035 from tornado.platform.auto import set_close_exec
00036 from tornado import stack_context
00037 from tornado.util import errno_from_exception
00038
00039 try:
00040 import multiprocessing
00041 except ImportError:
00042
00043 multiprocessing = None
00044
00045 try:
00046 long
00047 except NameError:
00048 long = int
00049
00050
00051 def cpu_count():
00052 """Returns the number of processors on this machine."""
00053 if multiprocessing is None:
00054 return 1
00055 try:
00056 return multiprocessing.cpu_count()
00057 except NotImplementedError:
00058 pass
00059 try:
00060 return os.sysconf("SC_NPROCESSORS_CONF")
00061 except ValueError:
00062 pass
00063 gen_log.error("Could not detect number of processors; assuming 1")
00064 return 1
00065
00066
00067 def _reseed_random():
00068 if 'random' not in sys.modules:
00069 return
00070 import random
00071
00072
00073
00074 try:
00075 seed = long(hexlify(os.urandom(16)), 16)
00076 except NotImplementedError:
00077 seed = int(time.time() * 1000) ^ os.getpid()
00078 random.seed(seed)
00079
00080
00081 def _pipe_cloexec():
00082 r, w = os.pipe()
00083 set_close_exec(r)
00084 set_close_exec(w)
00085 return r, w
00086
00087
00088 _task_id = None
00089
00090
00091 def fork_processes(num_processes, max_restarts=100):
00092 """Starts multiple worker processes.
00093
00094 If ``num_processes`` is None or <= 0, we detect the number of cores
00095 available on this machine and fork that number of child
00096 processes. If ``num_processes`` is given and > 0, we fork that
00097 specific number of sub-processes.
00098
00099 Since we use processes and not threads, there is no shared memory
00100 between any server code.
00101
00102 Note that multiple processes are not compatible with the autoreload
00103 module (or the ``autoreload=True`` option to `tornado.web.Application`
00104 which defaults to True when ``debug=True``).
00105 When using multiple processes, no IOLoops can be created or
00106 referenced until after the call to ``fork_processes``.
00107
00108 In each child process, ``fork_processes`` returns its *task id*, a
00109 number between 0 and ``num_processes``. Processes that exit
00110 abnormally (due to a signal or non-zero exit status) are restarted
00111 with the same id (up to ``max_restarts`` times). In the parent
00112 process, ``fork_processes`` returns None if all child processes
00113 have exited normally, but will otherwise only exit by throwing an
00114 exception.
00115 """
00116 global _task_id
00117 assert _task_id is None
00118 if num_processes is None or num_processes <= 0:
00119 num_processes = cpu_count()
00120 if ioloop.IOLoop.initialized():
00121 raise RuntimeError("Cannot run in multiple processes: IOLoop instance "
00122 "has already been initialized. You cannot call "
00123 "IOLoop.instance() before calling start_processes()")
00124 gen_log.info("Starting %d processes", num_processes)
00125 children = {}
00126
00127 def start_child(i):
00128 pid = os.fork()
00129 if pid == 0:
00130
00131 _reseed_random()
00132 global _task_id
00133 _task_id = i
00134 return i
00135 else:
00136 children[pid] = i
00137 return None
00138 for i in range(num_processes):
00139 id = start_child(i)
00140 if id is not None:
00141 return id
00142 num_restarts = 0
00143 while children:
00144 try:
00145 pid, status = os.wait()
00146 except OSError as e:
00147 if errno_from_exception(e) == errno.EINTR:
00148 continue
00149 raise
00150 if pid not in children:
00151 continue
00152 id = children.pop(pid)
00153 if os.WIFSIGNALED(status):
00154 gen_log.warning("child %d (pid %d) killed by signal %d, restarting",
00155 id, pid, os.WTERMSIG(status))
00156 elif os.WEXITSTATUS(status) != 0:
00157 gen_log.warning("child %d (pid %d) exited with status %d, restarting",
00158 id, pid, os.WEXITSTATUS(status))
00159 else:
00160 gen_log.info("child %d (pid %d) exited normally", id, pid)
00161 continue
00162 num_restarts += 1
00163 if num_restarts > max_restarts:
00164 raise RuntimeError("Too many child restarts, giving up")
00165 new_id = start_child(id)
00166 if new_id is not None:
00167 return new_id
00168
00169
00170
00171
00172 sys.exit(0)
00173
00174
00175 def task_id():
00176 """Returns the current task id, if any.
00177
00178 Returns None if this process was not created by `fork_processes`.
00179 """
00180 global _task_id
00181 return _task_id
00182
00183
00184 class Subprocess(object):
00185 """Wraps ``subprocess.Popen`` with IOStream support.
00186
00187 The constructor is the same as ``subprocess.Popen`` with the following
00188 additions:
00189
00190 * ``stdin``, ``stdout``, and ``stderr`` may have the value
00191 ``tornado.process.Subprocess.STREAM``, which will make the corresponding
00192 attribute of the resulting Subprocess a `.PipeIOStream`.
00193 * A new keyword argument ``io_loop`` may be used to pass in an IOLoop.
00194 """
00195 STREAM = object()
00196
00197 _initialized = False
00198 _waiting = {}
00199
00200 def __init__(self, *args, **kwargs):
00201 self.io_loop = kwargs.pop('io_loop', None) or ioloop.IOLoop.current()
00202
00203
00204 pipe_fds = []
00205 to_close = []
00206 if kwargs.get('stdin') is Subprocess.STREAM:
00207 in_r, in_w = _pipe_cloexec()
00208 kwargs['stdin'] = in_r
00209 pipe_fds.extend((in_r, in_w))
00210 to_close.append(in_r)
00211 self.stdin = PipeIOStream(in_w, io_loop=self.io_loop)
00212 if kwargs.get('stdout') is Subprocess.STREAM:
00213 out_r, out_w = _pipe_cloexec()
00214 kwargs['stdout'] = out_w
00215 pipe_fds.extend((out_r, out_w))
00216 to_close.append(out_w)
00217 self.stdout = PipeIOStream(out_r, io_loop=self.io_loop)
00218 if kwargs.get('stderr') is Subprocess.STREAM:
00219 err_r, err_w = _pipe_cloexec()
00220 kwargs['stderr'] = err_w
00221 pipe_fds.extend((err_r, err_w))
00222 to_close.append(err_w)
00223 self.stderr = PipeIOStream(err_r, io_loop=self.io_loop)
00224 try:
00225 self.proc = subprocess.Popen(*args, **kwargs)
00226 except:
00227 for fd in pipe_fds:
00228 os.close(fd)
00229 raise
00230 for fd in to_close:
00231 os.close(fd)
00232 for attr in ['stdin', 'stdout', 'stderr', 'pid']:
00233 if not hasattr(self, attr):
00234 setattr(self, attr, getattr(self.proc, attr))
00235 self._exit_callback = None
00236 self.returncode = None
00237
00238 def set_exit_callback(self, callback):
00239 """Runs ``callback`` when this process exits.
00240
00241 The callback takes one argument, the return code of the process.
00242
00243 This method uses a ``SIGCHILD`` handler, which is a global setting
00244 and may conflict if you have other libraries trying to handle the
00245 same signal. If you are using more than one ``IOLoop`` it may
00246 be necessary to call `Subprocess.initialize` first to designate
00247 one ``IOLoop`` to run the signal handlers.
00248
00249 In many cases a close callback on the stdout or stderr streams
00250 can be used as an alternative to an exit callback if the
00251 signal handler is causing a problem.
00252 """
00253 self._exit_callback = stack_context.wrap(callback)
00254 Subprocess.initialize(self.io_loop)
00255 Subprocess._waiting[self.pid] = self
00256 Subprocess._try_cleanup_process(self.pid)
00257
00258 @classmethod
00259 def initialize(cls, io_loop=None):
00260 """Initializes the ``SIGCHILD`` handler.
00261
00262 The signal handler is run on an `.IOLoop` to avoid locking issues.
00263 Note that the `.IOLoop` used for signal handling need not be the
00264 same one used by individual Subprocess objects (as long as the
00265 ``IOLoops`` are each running in separate threads).
00266 """
00267 if cls._initialized:
00268 return
00269 if io_loop is None:
00270 io_loop = ioloop.IOLoop.current()
00271 cls._old_sigchld = signal.signal(
00272 signal.SIGCHLD,
00273 lambda sig, frame: io_loop.add_callback_from_signal(cls._cleanup))
00274 cls._initialized = True
00275
00276 @classmethod
00277 def uninitialize(cls):
00278 """Removes the ``SIGCHILD`` handler."""
00279 if not cls._initialized:
00280 return
00281 signal.signal(signal.SIGCHLD, cls._old_sigchld)
00282 cls._initialized = False
00283
00284 @classmethod
00285 def _cleanup(cls):
00286 for pid in list(cls._waiting.keys()):
00287 cls._try_cleanup_process(pid)
00288
00289 @classmethod
00290 def _try_cleanup_process(cls, pid):
00291 try:
00292 ret_pid, status = os.waitpid(pid, os.WNOHANG)
00293 except OSError as e:
00294 if errno_from_exception(e) == errno.ECHILD:
00295 return
00296 if ret_pid == 0:
00297 return
00298 assert ret_pid == pid
00299 subproc = cls._waiting.pop(pid)
00300 subproc.io_loop.add_callback_from_signal(
00301 subproc._set_returncode, status)
00302
00303 def _set_returncode(self, status):
00304 if os.WIFSIGNALED(status):
00305 self.returncode = -os.WTERMSIG(status)
00306 else:
00307 assert os.WIFEXITED(status)
00308 self.returncode = os.WEXITSTATUS(status)
00309 if self._exit_callback:
00310 callback = self._exit_callback
00311 self._exit_callback = None
00312 callback(self.returncode)