00001 from __future__ import absolute_import, division, print_function, with_statement
00002
00003 import os
00004 import signal
00005 import socket
00006 from subprocess import Popen
00007 import sys
00008 import time
00009
00010 from tornado.netutil import BlockingResolver, ThreadedResolver, is_valid_ip, bind_sockets
00011 from tornado.stack_context import ExceptionStackContext
00012 from tornado.testing import AsyncTestCase, gen_test
00013 from tornado.test.util import unittest, skipIfNoNetwork
00014
00015 try:
00016 from concurrent import futures
00017 except ImportError:
00018 futures = None
00019
00020 try:
00021 import pycares
00022 except ImportError:
00023 pycares = None
00024 else:
00025 from tornado.platform.caresresolver import CaresResolver
00026
00027 try:
00028 import twisted
00029 import twisted.names
00030 except ImportError:
00031 twisted = None
00032 else:
00033 from tornado.platform.twisted import TwistedResolver
00034
00035
00036 class _ResolverTestMixin(object):
00037 def skipOnCares(self):
00038
00039
00040
00041
00042
00043 if self.resolver.__class__.__name__ == 'CaresResolver':
00044 self.skipTest("CaresResolver doesn't recognize fake NXDOMAIN")
00045
00046 def test_localhost(self):
00047 self.resolver.resolve('localhost', 80, callback=self.stop)
00048 result = self.wait()
00049 self.assertIn((socket.AF_INET, ('127.0.0.1', 80)), result)
00050
00051 @gen_test
00052 def test_future_interface(self):
00053 addrinfo = yield self.resolver.resolve('localhost', 80,
00054 socket.AF_UNSPEC)
00055 self.assertIn((socket.AF_INET, ('127.0.0.1', 80)),
00056 addrinfo)
00057
00058 def test_bad_host(self):
00059 self.skipOnCares()
00060 def handler(exc_typ, exc_val, exc_tb):
00061 self.stop(exc_val)
00062 return True
00063
00064 with ExceptionStackContext(handler):
00065 self.resolver.resolve('an invalid domain', 80, callback=self.stop)
00066
00067 result = self.wait()
00068 self.assertIsInstance(result, Exception)
00069
00070 @gen_test
00071 def test_future_interface_bad_host(self):
00072 self.skipOnCares()
00073 with self.assertRaises(Exception):
00074 yield self.resolver.resolve('an invalid domain', 80,
00075 socket.AF_UNSPEC)
00076
00077
00078 @skipIfNoNetwork
00079 class BlockingResolverTest(AsyncTestCase, _ResolverTestMixin):
00080 def setUp(self):
00081 super(BlockingResolverTest, self).setUp()
00082 self.resolver = BlockingResolver(io_loop=self.io_loop)
00083
00084
00085 @skipIfNoNetwork
00086 @unittest.skipIf(futures is None, "futures module not present")
00087 class ThreadedResolverTest(AsyncTestCase, _ResolverTestMixin):
00088 def setUp(self):
00089 super(ThreadedResolverTest, self).setUp()
00090 self.resolver = ThreadedResolver(io_loop=self.io_loop)
00091
00092 def tearDown(self):
00093 self.resolver.close()
00094 super(ThreadedResolverTest, self).tearDown()
00095
00096
00097 @skipIfNoNetwork
00098 @unittest.skipIf(futures is None, "futures module not present")
00099 @unittest.skipIf(sys.platform == 'win32', "preexec_fn not available on win32")
00100 class ThreadedResolverImportTest(unittest.TestCase):
00101 def test_import(self):
00102 TIMEOUT = 5
00103
00104
00105
00106
00107 command = [
00108 sys.executable,
00109 '-c',
00110 'import tornado.test.resolve_test_helper']
00111
00112 start = time.time()
00113 popen = Popen(command, preexec_fn=lambda: signal.alarm(TIMEOUT))
00114 while time.time() - start < TIMEOUT:
00115 return_code = popen.poll()
00116 if return_code is not None:
00117 self.assertEqual(0, return_code)
00118 return
00119 time.sleep(0.05)
00120
00121 self.fail("import timed out")
00122
00123
00124 @skipIfNoNetwork
00125 @unittest.skipIf(pycares is None, "pycares module not present")
00126 class CaresResolverTest(AsyncTestCase, _ResolverTestMixin):
00127 def setUp(self):
00128 super(CaresResolverTest, self).setUp()
00129 self.resolver = CaresResolver(io_loop=self.io_loop)
00130
00131
00132 @skipIfNoNetwork
00133 @unittest.skipIf(twisted is None, "twisted module not present")
00134 @unittest.skipIf(getattr(twisted, '__version__', '0.0') < "12.1", "old version of twisted")
00135 class TwistedResolverTest(AsyncTestCase, _ResolverTestMixin):
00136 def setUp(self):
00137 super(TwistedResolverTest, self).setUp()
00138 self.resolver = TwistedResolver(io_loop=self.io_loop)
00139
00140
00141 class IsValidIPTest(unittest.TestCase):
00142 def test_is_valid_ip(self):
00143 self.assertTrue(is_valid_ip('127.0.0.1'))
00144 self.assertTrue(is_valid_ip('4.4.4.4'))
00145 self.assertTrue(is_valid_ip('::1'))
00146 self.assertTrue(is_valid_ip('2620:0:1cfe:face:b00c::3'))
00147 self.assertTrue(not is_valid_ip('www.google.com'))
00148 self.assertTrue(not is_valid_ip('localhost'))
00149 self.assertTrue(not is_valid_ip('4.4.4.4<'))
00150 self.assertTrue(not is_valid_ip(' 127.0.0.1'))
00151 self.assertTrue(not is_valid_ip(''))
00152 self.assertTrue(not is_valid_ip(' '))
00153 self.assertTrue(not is_valid_ip('\n'))
00154 self.assertTrue(not is_valid_ip('\x00'))
00155
00156
00157 class TestPortAllocation(unittest.TestCase):
00158 def test_same_port_allocation(self):
00159 if 'TRAVIS' in os.environ:
00160 self.skipTest("dual-stack servers often have port conflicts on travis")
00161 sockets = bind_sockets(None, 'localhost')
00162 try:
00163 port = sockets[0].getsockname()[1]
00164 self.assertTrue(all(s.getsockname()[1] == port
00165 for s in sockets[1:]))
00166 finally:
00167 for sock in sockets:
00168 sock.close()