00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 """Miscellaneous network utility code."""
00018
00019 from __future__ import absolute_import, division, with_statement
00020
00021 import errno
00022 import logging
00023 import os
00024 import socket
00025 import stat
00026
00027 from tornado import process
00028 from tornado.ioloop import IOLoop
00029 from tornado.iostream import IOStream, SSLIOStream
00030 from tornado.platform.auto import set_close_exec
00031
00032 try:
00033 import ssl
00034 except ImportError:
00035 ssl = None
00036
00037
00038 class TCPServer(object):
00039 r"""A non-blocking, single-threaded TCP server.
00040
00041 To use `TCPServer`, define a subclass which overrides the `handle_stream`
00042 method.
00043
00044 `TCPServer` can serve SSL traffic with Python 2.6+ and OpenSSL.
00045 To make this server serve SSL traffic, send the ssl_options dictionary
00046 argument with the arguments required for the `ssl.wrap_socket` method,
00047 including "certfile" and "keyfile"::
00048
00049 TCPServer(ssl_options={
00050 "certfile": os.path.join(data_dir, "mydomain.crt"),
00051 "keyfile": os.path.join(data_dir, "mydomain.key"),
00052 })
00053
00054 `TCPServer` initialization follows one of three patterns:
00055
00056 1. `listen`: simple single-process::
00057
00058 server = TCPServer()
00059 server.listen(8888)
00060 IOLoop.instance().start()
00061
00062 2. `bind`/`start`: simple multi-process::
00063
00064 server = TCPServer()
00065 server.bind(8888)
00066 server.start(0) # Forks multiple sub-processes
00067 IOLoop.instance().start()
00068
00069 When using this interface, an `IOLoop` must *not* be passed
00070 to the `TCPServer` constructor. `start` will always start
00071 the server on the default singleton `IOLoop`.
00072
00073 3. `add_sockets`: advanced multi-process::
00074
00075 sockets = bind_sockets(8888)
00076 tornado.process.fork_processes(0)
00077 server = TCPServer()
00078 server.add_sockets(sockets)
00079 IOLoop.instance().start()
00080
00081 The `add_sockets` interface is more complicated, but it can be
00082 used with `tornado.process.fork_processes` to give you more
00083 flexibility in when the fork happens. `add_sockets` can
00084 also be used in single-process servers if you want to create
00085 your listening sockets in some way other than
00086 `bind_sockets`.
00087 """
00088 def __init__(self, io_loop=None, ssl_options=None):
00089 self.io_loop = io_loop
00090 self.ssl_options = ssl_options
00091 self._sockets = {}
00092 self._pending_sockets = []
00093 self._started = False
00094
00095 def listen(self, port, address=""):
00096 """Starts accepting connections on the given port.
00097
00098 This method may be called more than once to listen on multiple ports.
00099 `listen` takes effect immediately; it is not necessary to call
00100 `TCPServer.start` afterwards. It is, however, necessary to start
00101 the `IOLoop`.
00102 """
00103 sockets = bind_sockets(port, address=address)
00104 self.add_sockets(sockets)
00105
00106 def add_sockets(self, sockets):
00107 """Makes this server start accepting connections on the given sockets.
00108
00109 The ``sockets`` parameter is a list of socket objects such as
00110 those returned by `bind_sockets`.
00111 `add_sockets` is typically used in combination with that
00112 method and `tornado.process.fork_processes` to provide greater
00113 control over the initialization of a multi-process server.
00114 """
00115 if self.io_loop is None:
00116 self.io_loop = IOLoop.instance()
00117
00118 for sock in sockets:
00119 self._sockets[sock.fileno()] = sock
00120 add_accept_handler(sock, self._handle_connection,
00121 io_loop=self.io_loop)
00122
00123 def add_socket(self, socket):
00124 """Singular version of `add_sockets`. Takes a single socket object."""
00125 self.add_sockets([socket])
00126
00127 def bind(self, port, address=None, family=socket.AF_UNSPEC, backlog=128):
00128 """Binds this server to the given port on the given address.
00129
00130 To start the server, call `start`. If you want to run this server
00131 in a single process, you can call `listen` as a shortcut to the
00132 sequence of `bind` and `start` calls.
00133
00134 Address may be either an IP address or hostname. If it's a hostname,
00135 the server will listen on all IP addresses associated with the
00136 name. Address may be an empty string or None to listen on all
00137 available interfaces. Family may be set to either ``socket.AF_INET``
00138 or ``socket.AF_INET6`` to restrict to ipv4 or ipv6 addresses, otherwise
00139 both will be used if available.
00140
00141 The ``backlog`` argument has the same meaning as for
00142 `socket.listen`.
00143
00144 This method may be called multiple times prior to `start` to listen
00145 on multiple ports or interfaces.
00146 """
00147 sockets = bind_sockets(port, address=address, family=family,
00148 backlog=backlog)
00149 if self._started:
00150 self.add_sockets(sockets)
00151 else:
00152 self._pending_sockets.extend(sockets)
00153
00154 def start(self, num_processes=1):
00155 """Starts this server in the IOLoop.
00156
00157 By default, we run the server in this process and do not fork any
00158 additional child process.
00159
00160 If num_processes is ``None`` or <= 0, we detect the number of cores
00161 available on this machine and fork that number of child
00162 processes. If num_processes is given and > 1, we fork that
00163 specific number of sub-processes.
00164
00165 Since we use processes and not threads, there is no shared memory
00166 between any server code.
00167
00168 Note that multiple processes are not compatible with the autoreload
00169 module (or the ``debug=True`` option to `tornado.web.Application`).
00170 When using multiple processes, no IOLoops can be created or
00171 referenced until after the call to ``TCPServer.start(n)``.
00172 """
00173 assert not self._started
00174 self._started = True
00175 if num_processes != 1:
00176 process.fork_processes(num_processes)
00177 sockets = self._pending_sockets
00178 self._pending_sockets = []
00179 self.add_sockets(sockets)
00180
00181 def stop(self):
00182 """Stops listening for new connections.
00183
00184 Requests currently in progress may still continue after the
00185 server is stopped.
00186 """
00187 for fd, sock in self._sockets.iteritems():
00188 self.io_loop.remove_handler(fd)
00189 sock.close()
00190
00191 def handle_stream(self, stream, address):
00192 """Override to handle a new `IOStream` from an incoming connection."""
00193 raise NotImplementedError()
00194
00195 def _handle_connection(self, connection, address):
00196 if self.ssl_options is not None:
00197 assert ssl, "Python 2.6+ and OpenSSL required for SSL"
00198 try:
00199 connection = ssl.wrap_socket(connection,
00200 server_side=True,
00201 do_handshake_on_connect=False,
00202 **self.ssl_options)
00203 except ssl.SSLError, err:
00204 if err.args[0] == ssl.SSL_ERROR_EOF:
00205 return connection.close()
00206 else:
00207 raise
00208 except socket.error, err:
00209 if err.args[0] == errno.ECONNABORTED:
00210 return connection.close()
00211 else:
00212 raise
00213 try:
00214 if self.ssl_options is not None:
00215 stream = SSLIOStream(connection, io_loop=self.io_loop)
00216 else:
00217 stream = IOStream(connection, io_loop=self.io_loop)
00218 self.handle_stream(stream, address)
00219 except Exception:
00220 logging.error("Error in connection callback", exc_info=True)
00221
00222
00223 def bind_sockets(port, address=None, family=socket.AF_UNSPEC, backlog=128):
00224 """Creates listening sockets bound to the given port and address.
00225
00226 Returns a list of socket objects (multiple sockets are returned if
00227 the given address maps to multiple IP addresses, which is most common
00228 for mixed IPv4 and IPv6 use).
00229
00230 Address may be either an IP address or hostname. If it's a hostname,
00231 the server will listen on all IP addresses associated with the
00232 name. Address may be an empty string or None to listen on all
00233 available interfaces. Family may be set to either socket.AF_INET
00234 or socket.AF_INET6 to restrict to ipv4 or ipv6 addresses, otherwise
00235 both will be used if available.
00236
00237 The ``backlog`` argument has the same meaning as for
00238 ``socket.listen()``.
00239 """
00240 sockets = []
00241 if address == "":
00242 address = None
00243 flags = socket.AI_PASSIVE
00244 if hasattr(socket, "AI_ADDRCONFIG"):
00245
00246
00247
00248
00249 flags |= socket.AI_ADDRCONFIG
00250 for res in set(socket.getaddrinfo(address, port, family, socket.SOCK_STREAM,
00251 0, flags)):
00252 af, socktype, proto, canonname, sockaddr = res
00253 sock = socket.socket(af, socktype, proto)
00254 set_close_exec(sock.fileno())
00255 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
00256 if af == socket.AF_INET6:
00257
00258
00259
00260
00261
00262
00263
00264
00265 if hasattr(socket, "IPPROTO_IPV6"):
00266 sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
00267 sock.setblocking(0)
00268 sock.bind(sockaddr)
00269 sock.listen(backlog)
00270 sockets.append(sock)
00271 return sockets
00272
00273 if hasattr(socket, 'AF_UNIX'):
00274 def bind_unix_socket(file, mode=0600, backlog=128):
00275 """Creates a listening unix socket.
00276
00277 If a socket with the given name already exists, it will be deleted.
00278 If any other file with that name exists, an exception will be
00279 raised.
00280
00281 Returns a socket object (not a list of socket objects like
00282 `bind_sockets`)
00283 """
00284 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
00285 set_close_exec(sock.fileno())
00286 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
00287 sock.setblocking(0)
00288 try:
00289 st = os.stat(file)
00290 except OSError, err:
00291 if err.errno != errno.ENOENT:
00292 raise
00293 else:
00294 if stat.S_ISSOCK(st.st_mode):
00295 os.remove(file)
00296 else:
00297 raise ValueError("File %s exists and is not a socket", file)
00298 sock.bind(file)
00299 os.chmod(file, mode)
00300 sock.listen(backlog)
00301 return sock
00302
00303
00304 def add_accept_handler(sock, callback, io_loop=None):
00305 """Adds an ``IOLoop`` event handler to accept new connections on ``sock``.
00306
00307 When a connection is accepted, ``callback(connection, address)`` will
00308 be run (``connection`` is a socket object, and ``address`` is the
00309 address of the other end of the connection). Note that this signature
00310 is different from the ``callback(fd, events)`` signature used for
00311 ``IOLoop`` handlers.
00312 """
00313 if io_loop is None:
00314 io_loop = IOLoop.instance()
00315
00316 def accept_handler(fd, events):
00317 while True:
00318 try:
00319 connection, address = sock.accept()
00320 except socket.error, e:
00321 if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
00322 return
00323 raise
00324 callback(connection, address)
00325 io_loop.add_handler(sock.fileno(), accept_handler, IOLoop.READ)