00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016 from __future__ import absolute_import, division, print_function, with_statement
00017
00018 import logging
00019 import re
00020 import socket
00021 import sys
00022 import traceback
00023
00024 from tornado.concurrent import Future, return_future, ReturnValueIgnoredError
00025 from tornado.escape import utf8, to_unicode
00026 from tornado import gen
00027 from tornado.iostream import IOStream
00028 from tornado import stack_context
00029 from tornado.tcpserver import TCPServer
00030 from tornado.testing import AsyncTestCase, LogTrapTestCase, bind_unused_port, gen_test
00031
00032
00033 try:
00034 from concurrent import futures
00035 except ImportError:
00036 futures = None
00037
00038
00039 class ReturnFutureTest(AsyncTestCase):
00040 @return_future
00041 def sync_future(self, callback):
00042 callback(42)
00043
00044 @return_future
00045 def async_future(self, callback):
00046 self.io_loop.add_callback(callback, 42)
00047
00048 @return_future
00049 def immediate_failure(self, callback):
00050 1 / 0
00051
00052 @return_future
00053 def delayed_failure(self, callback):
00054 self.io_loop.add_callback(lambda: 1 / 0)
00055
00056 @return_future
00057 def return_value(self, callback):
00058
00059
00060
00061 return 42
00062
00063 @return_future
00064 def no_result_future(self, callback):
00065 callback()
00066
00067 def test_immediate_failure(self):
00068 with self.assertRaises(ZeroDivisionError):
00069
00070 self.immediate_failure(callback=self.stop)
00071
00072 self.io_loop.add_timeout(self.io_loop.time() + 0.05, self.stop)
00073 result = self.wait()
00074 self.assertIs(result, None)
00075
00076 def test_return_value(self):
00077 with self.assertRaises(ReturnValueIgnoredError):
00078 self.return_value(callback=self.stop)
00079
00080 def test_callback_kw(self):
00081 future = self.sync_future(callback=self.stop)
00082 result = self.wait()
00083 self.assertEqual(result, 42)
00084 self.assertEqual(future.result(), 42)
00085
00086 def test_callback_positional(self):
00087
00088
00089 future = self.sync_future(self.stop)
00090 result = self.wait()
00091 self.assertEqual(result, 42)
00092 self.assertEqual(future.result(), 42)
00093
00094 def test_no_callback(self):
00095 future = self.sync_future()
00096 self.assertEqual(future.result(), 42)
00097
00098 def test_none_callback_kw(self):
00099
00100 future = self.sync_future(callback=None)
00101 self.assertEqual(future.result(), 42)
00102
00103 def test_none_callback_pos(self):
00104 future = self.sync_future(None)
00105 self.assertEqual(future.result(), 42)
00106
00107 def test_async_future(self):
00108 future = self.async_future()
00109 self.assertFalse(future.done())
00110 self.io_loop.add_future(future, self.stop)
00111 future2 = self.wait()
00112 self.assertIs(future, future2)
00113 self.assertEqual(future.result(), 42)
00114
00115 @gen_test
00116 def test_async_future_gen(self):
00117 result = yield self.async_future()
00118 self.assertEqual(result, 42)
00119
00120 def test_delayed_failure(self):
00121 future = self.delayed_failure()
00122 self.io_loop.add_future(future, self.stop)
00123 future2 = self.wait()
00124 self.assertIs(future, future2)
00125 with self.assertRaises(ZeroDivisionError):
00126 future.result()
00127
00128 def test_kw_only_callback(self):
00129 @return_future
00130 def f(**kwargs):
00131 kwargs['callback'](42)
00132 future = f()
00133 self.assertEqual(future.result(), 42)
00134
00135 def test_error_in_callback(self):
00136 self.sync_future(callback=lambda future: 1 / 0)
00137
00138
00139 self.assertRaises(ZeroDivisionError, self.wait)
00140
00141 def test_no_result_future(self):
00142 future = self.no_result_future(self.stop)
00143 result = self.wait()
00144 self.assertIs(result, None)
00145
00146 future.result()
00147
00148 def test_no_result_future_callback(self):
00149 future = self.no_result_future(callback=lambda: self.stop())
00150 result = self.wait()
00151 self.assertIs(result, None)
00152 future.result()
00153
00154 @gen_test
00155 def test_future_traceback(self):
00156 @return_future
00157 @gen.engine
00158 def f(callback):
00159 yield gen.Task(self.io_loop.add_callback)
00160 try:
00161 1 / 0
00162 except ZeroDivisionError:
00163 self.expected_frame = traceback.extract_tb(
00164 sys.exc_info()[2], limit=1)[0]
00165 raise
00166 try:
00167 yield f()
00168 self.fail("didn't get expected exception")
00169 except ZeroDivisionError:
00170 tb = traceback.extract_tb(sys.exc_info()[2])
00171 self.assertIn(self.expected_frame, tb)
00172
00173
00174
00175
00176
00177 class CapServer(TCPServer):
00178 def handle_stream(self, stream, address):
00179 logging.info("handle_stream")
00180 self.stream = stream
00181 self.stream.read_until(b"\n", self.handle_read)
00182
00183 def handle_read(self, data):
00184 logging.info("handle_read")
00185 data = to_unicode(data)
00186 if data == data.upper():
00187 self.stream.write(b"error\talready capitalized\n")
00188 else:
00189
00190 self.stream.write(utf8("ok\t%s" % data.upper()))
00191 self.stream.close()
00192
00193
00194 class CapError(Exception):
00195 pass
00196
00197
00198 class BaseCapClient(object):
00199 def __init__(self, port, io_loop):
00200 self.port = port
00201 self.io_loop = io_loop
00202
00203 def process_response(self, data):
00204 status, message = re.match('(.*)\t(.*)\n', to_unicode(data)).groups()
00205 if status == 'ok':
00206 return message
00207 else:
00208 raise CapError(message)
00209
00210
00211 class ManualCapClient(BaseCapClient):
00212 def capitalize(self, request_data, callback=None):
00213 logging.info("capitalize")
00214 self.request_data = request_data
00215 self.stream = IOStream(socket.socket(), io_loop=self.io_loop)
00216 self.stream.connect(('127.0.0.1', self.port),
00217 callback=self.handle_connect)
00218 self.future = Future()
00219 if callback is not None:
00220 self.future.add_done_callback(
00221 stack_context.wrap(lambda future: callback(future.result())))
00222 return self.future
00223
00224 def handle_connect(self):
00225 logging.info("handle_connect")
00226 self.stream.write(utf8(self.request_data + "\n"))
00227 self.stream.read_until(b'\n', callback=self.handle_read)
00228
00229 def handle_read(self, data):
00230 logging.info("handle_read")
00231 self.stream.close()
00232 try:
00233 self.future.set_result(self.process_response(data))
00234 except CapError as e:
00235 self.future.set_exception(e)
00236
00237
00238 class DecoratorCapClient(BaseCapClient):
00239 @return_future
00240 def capitalize(self, request_data, callback):
00241 logging.info("capitalize")
00242 self.request_data = request_data
00243 self.stream = IOStream(socket.socket(), io_loop=self.io_loop)
00244 self.stream.connect(('127.0.0.1', self.port),
00245 callback=self.handle_connect)
00246 self.callback = callback
00247
00248 def handle_connect(self):
00249 logging.info("handle_connect")
00250 self.stream.write(utf8(self.request_data + "\n"))
00251 self.stream.read_until(b'\n', callback=self.handle_read)
00252
00253 def handle_read(self, data):
00254 logging.info("handle_read")
00255 self.stream.close()
00256 self.callback(self.process_response(data))
00257
00258
00259 class GeneratorCapClient(BaseCapClient):
00260 @return_future
00261 @gen.engine
00262 def capitalize(self, request_data, callback):
00263 logging.info('capitalize')
00264 stream = IOStream(socket.socket(), io_loop=self.io_loop)
00265 logging.info('connecting')
00266 yield gen.Task(stream.connect, ('127.0.0.1', self.port))
00267 stream.write(utf8(request_data + '\n'))
00268 logging.info('reading')
00269 data = yield gen.Task(stream.read_until, b'\n')
00270 logging.info('returning')
00271 stream.close()
00272 callback(self.process_response(data))
00273
00274
00275 class ClientTestMixin(object):
00276 def setUp(self):
00277 super(ClientTestMixin, self).setUp()
00278 self.server = CapServer(io_loop=self.io_loop)
00279 sock, port = bind_unused_port()
00280 self.server.add_sockets([sock])
00281 self.client = self.client_class(io_loop=self.io_loop, port=port)
00282
00283 def tearDown(self):
00284 self.server.stop()
00285 super(ClientTestMixin, self).tearDown()
00286
00287 def test_callback(self):
00288 self.client.capitalize("hello", callback=self.stop)
00289 result = self.wait()
00290 self.assertEqual(result, "HELLO")
00291
00292 def test_callback_error(self):
00293 self.client.capitalize("HELLO", callback=self.stop)
00294 self.assertRaisesRegexp(CapError, "already capitalized", self.wait)
00295
00296 def test_future(self):
00297 future = self.client.capitalize("hello")
00298 self.io_loop.add_future(future, self.stop)
00299 self.wait()
00300 self.assertEqual(future.result(), "HELLO")
00301
00302 def test_future_error(self):
00303 future = self.client.capitalize("HELLO")
00304 self.io_loop.add_future(future, self.stop)
00305 self.wait()
00306 self.assertRaisesRegexp(CapError, "already capitalized", future.result)
00307
00308 def test_generator(self):
00309 @gen.engine
00310 def f():
00311 result = yield self.client.capitalize("hello")
00312 self.assertEqual(result, "HELLO")
00313 self.stop()
00314 f()
00315 self.wait()
00316
00317 def test_generator_error(self):
00318 @gen.engine
00319 def f():
00320 with self.assertRaisesRegexp(CapError, "already capitalized"):
00321 yield self.client.capitalize("HELLO")
00322 self.stop()
00323 f()
00324 self.wait()
00325
00326
00327 class ManualClientTest(ClientTestMixin, AsyncTestCase, LogTrapTestCase):
00328 client_class = ManualCapClient
00329
00330
00331 class DecoratorClientTest(ClientTestMixin, AsyncTestCase, LogTrapTestCase):
00332 client_class = DecoratorCapClient
00333
00334
00335 class GeneratorClientTest(ClientTestMixin, AsyncTestCase, LogTrapTestCase):
00336 client_class = GeneratorCapClient