Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 """Utilities for working with multiple processes."""
00018
00019 from __future__ import absolute_import, division, with_statement
00020
00021 import errno
00022 import logging
00023 import os
00024 import sys
00025 import time
00026
00027 from binascii import hexlify
00028
00029 from tornado import ioloop
00030
00031 try:
00032 import multiprocessing
00033 except ImportError:
00034 multiprocessing = None
00035
00036
00037 def cpu_count():
00038 """Returns the number of processors on this machine."""
00039 if multiprocessing is not None:
00040 try:
00041 return multiprocessing.cpu_count()
00042 except NotImplementedError:
00043 pass
00044 try:
00045 return os.sysconf("SC_NPROCESSORS_CONF")
00046 except ValueError:
00047 pass
00048 logging.error("Could not detect number of processors; assuming 1")
00049 return 1
00050
00051
00052 def _reseed_random():
00053 if 'random' not in sys.modules:
00054 return
00055 import random
00056
00057
00058
00059 try:
00060 seed = long(hexlify(os.urandom(16)), 16)
00061 except NotImplementedError:
00062 seed = int(time.time() * 1000) ^ os.getpid()
00063 random.seed(seed)
00064
00065
00066 _task_id = None
00067
00068
00069 def fork_processes(num_processes, max_restarts=100):
00070 """Starts multiple worker processes.
00071
00072 If ``num_processes`` is None or <= 0, we detect the number of cores
00073 available on this machine and fork that number of child
00074 processes. If ``num_processes`` is given and > 0, we fork that
00075 specific number of sub-processes.
00076
00077 Since we use processes and not threads, there is no shared memory
00078 between any server code.
00079
00080 Note that multiple processes are not compatible with the autoreload
00081 module (or the debug=True option to `tornado.web.Application`).
00082 When using multiple processes, no IOLoops can be created or
00083 referenced until after the call to ``fork_processes``.
00084
00085 In each child process, ``fork_processes`` returns its *task id*, a
00086 number between 0 and ``num_processes``. Processes that exit
00087 abnormally (due to a signal or non-zero exit status) are restarted
00088 with the same id (up to ``max_restarts`` times). In the parent
00089 process, ``fork_processes`` returns None if all child processes
00090 have exited normally, but will otherwise only exit by throwing an
00091 exception.
00092 """
00093 global _task_id
00094 assert _task_id is None
00095 if num_processes is None or num_processes <= 0:
00096 num_processes = cpu_count()
00097 if ioloop.IOLoop.initialized():
00098 raise RuntimeError("Cannot run in multiple processes: IOLoop instance "
00099 "has already been initialized. You cannot call "
00100 "IOLoop.instance() before calling start_processes()")
00101 logging.info("Starting %d processes", num_processes)
00102 children = {}
00103
00104 def start_child(i):
00105 pid = os.fork()
00106 if pid == 0:
00107
00108 _reseed_random()
00109 global _task_id
00110 _task_id = i
00111 return i
00112 else:
00113 children[pid] = i
00114 return None
00115 for i in range(num_processes):
00116 id = start_child(i)
00117 if id is not None:
00118 return id
00119 num_restarts = 0
00120 while children:
00121 try:
00122 pid, status = os.wait()
00123 except OSError, e:
00124 if e.errno == errno.EINTR:
00125 continue
00126 raise
00127 if pid not in children:
00128 continue
00129 id = children.pop(pid)
00130 if os.WIFSIGNALED(status):
00131 logging.warning("child %d (pid %d) killed by signal %d, restarting",
00132 id, pid, os.WTERMSIG(status))
00133 elif os.WEXITSTATUS(status) != 0:
00134 logging.warning("child %d (pid %d) exited with status %d, restarting",
00135 id, pid, os.WEXITSTATUS(status))
00136 else:
00137 logging.info("child %d (pid %d) exited normally", id, pid)
00138 continue
00139 num_restarts += 1
00140 if num_restarts > max_restarts:
00141 raise RuntimeError("Too many child restarts, giving up")
00142 new_id = start_child(id)
00143 if new_id is not None:
00144 return new_id
00145
00146
00147
00148
00149 sys.exit(0)
00150
00151
00152 def task_id():
00153 """Returns the current task id, if any.
00154
00155 Returns None if this process was not created by `fork_processes`.
00156 """
00157 global _task_id
00158 return _task_id