00001
00002
00003
00004 from __future__ import absolute_import, division, print_function, with_statement
00005 import logging
00006 import os
00007 import signal
00008 import subprocess
00009 import sys
00010 from tornado.httpclient import HTTPClient, HTTPError
00011 from tornado.httpserver import HTTPServer
00012 from tornado.ioloop import IOLoop
00013 from tornado.log import gen_log
00014 from tornado.process import fork_processes, task_id, Subprocess
00015 from tornado.simple_httpclient import SimpleAsyncHTTPClient
00016 from tornado.testing import bind_unused_port, ExpectLog, AsyncTestCase
00017 from tornado.test.util import unittest, skipIfNonUnix
00018 from tornado.web import RequestHandler, Application
00019
00020
00021 def skip_if_twisted():
00022 if IOLoop.configured_class().__name__.endswith(('TwistedIOLoop',
00023 'AsyncIOMainLoop')):
00024 raise unittest.SkipTest("Process tests not compatible with "
00025 "TwistedIOLoop or AsyncIOMainLoop")
00026
00027
00028
00029
00030 @skipIfNonUnix
00031 class ProcessTest(unittest.TestCase):
00032 def get_app(self):
00033 class ProcessHandler(RequestHandler):
00034 def get(self):
00035 if self.get_argument("exit", None):
00036
00037
00038 os._exit(int(self.get_argument("exit")))
00039 if self.get_argument("signal", None):
00040 os.kill(os.getpid(),
00041 int(self.get_argument("signal")))
00042 self.write(str(os.getpid()))
00043 return Application([("/", ProcessHandler)])
00044
00045 def tearDown(self):
00046 if task_id() is not None:
00047
00048
00049
00050
00051
00052
00053 logging.error("aborting child process from tearDown")
00054 logging.shutdown()
00055 os._exit(1)
00056
00057 signal.alarm(0)
00058 super(ProcessTest, self).tearDown()
00059
00060 def test_multi_process(self):
00061
00062
00063 skip_if_twisted()
00064 with ExpectLog(gen_log, "(Starting .* processes|child .* exited|uncaught exception)"):
00065 self.assertFalse(IOLoop.initialized())
00066 sock, port = bind_unused_port()
00067
00068 def get_url(path):
00069 return "http://127.0.0.1:%d%s" % (port, path)
00070
00071 signal.alarm(5)
00072 try:
00073 id = fork_processes(3, max_restarts=3)
00074 self.assertTrue(id is not None)
00075 signal.alarm(5)
00076 except SystemExit as e:
00077
00078
00079 self.assertEqual(e.code, 0)
00080 self.assertTrue(task_id() is None)
00081 sock.close()
00082 return
00083 try:
00084 if id in (0, 1):
00085 self.assertEqual(id, task_id())
00086 server = HTTPServer(self.get_app())
00087 server.add_sockets([sock])
00088 IOLoop.instance().start()
00089 elif id == 2:
00090 self.assertEqual(id, task_id())
00091 sock.close()
00092
00093
00094
00095
00096 client = HTTPClient(SimpleAsyncHTTPClient)
00097
00098 def fetch(url, fail_ok=False):
00099 try:
00100 return client.fetch(get_url(url))
00101 except HTTPError as e:
00102 if not (fail_ok and e.code == 599):
00103 raise
00104
00105
00106 fetch("/?exit=2", fail_ok=True)
00107 fetch("/?exit=3", fail_ok=True)
00108
00109
00110 int(fetch("/").body)
00111
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121 fetch("/?exit=0", fail_ok=True)
00122
00123 pid = int(fetch("/").body)
00124 fetch("/?exit=4", fail_ok=True)
00125 pid2 = int(fetch("/").body)
00126 self.assertNotEqual(pid, pid2)
00127
00128
00129 fetch("/?exit=0", fail_ok=True)
00130
00131 os._exit(0)
00132 except Exception:
00133 logging.error("exception in child process %d", id, exc_info=True)
00134 raise
00135
00136
00137 @skipIfNonUnix
00138 class SubprocessTest(AsyncTestCase):
00139 def test_subprocess(self):
00140 if IOLoop.configured_class().__name__.endswith('LayeredTwistedIOLoop'):
00141
00142
00143
00144
00145
00146 raise unittest.SkipTest("Subprocess tests not compatible with "
00147 "LayeredTwistedIOLoop")
00148 subproc = Subprocess([sys.executable, '-u', '-i'],
00149 stdin=Subprocess.STREAM,
00150 stdout=Subprocess.STREAM, stderr=subprocess.STDOUT,
00151 io_loop=self.io_loop)
00152 self.addCleanup(lambda: os.kill(subproc.pid, signal.SIGTERM))
00153 subproc.stdout.read_until(b'>>> ', self.stop)
00154 self.wait()
00155 subproc.stdin.write(b"print('hello')\n")
00156 subproc.stdout.read_until(b'\n', self.stop)
00157 data = self.wait()
00158 self.assertEqual(data, b"hello\n")
00159
00160 subproc.stdout.read_until(b">>> ", self.stop)
00161 self.wait()
00162 subproc.stdin.write(b"raise SystemExit\n")
00163 subproc.stdout.read_until_close(self.stop)
00164 data = self.wait()
00165 self.assertEqual(data, b"")
00166
00167 def test_close_stdin(self):
00168
00169 subproc = Subprocess([sys.executable, '-u', '-i'],
00170 stdin=Subprocess.STREAM,
00171 stdout=Subprocess.STREAM, stderr=subprocess.STDOUT,
00172 io_loop=self.io_loop)
00173 self.addCleanup(lambda: os.kill(subproc.pid, signal.SIGTERM))
00174 subproc.stdout.read_until(b'>>> ', self.stop)
00175 self.wait()
00176 subproc.stdin.close()
00177 subproc.stdout.read_until_close(self.stop)
00178 data = self.wait()
00179 self.assertEqual(data, b"\n")
00180
00181 def test_stderr(self):
00182 subproc = Subprocess([sys.executable, '-u', '-c',
00183 r"import sys; sys.stderr.write('hello\n')"],
00184 stderr=Subprocess.STREAM,
00185 io_loop=self.io_loop)
00186 self.addCleanup(lambda: os.kill(subproc.pid, signal.SIGTERM))
00187 subproc.stderr.read_until(b'\n', self.stop)
00188 data = self.wait()
00189 self.assertEqual(data, b'hello\n')
00190
00191 def test_sigchild(self):
00192
00193 skip_if_twisted()
00194 Subprocess.initialize(io_loop=self.io_loop)
00195 self.addCleanup(Subprocess.uninitialize)
00196 subproc = Subprocess([sys.executable, '-c', 'pass'],
00197 io_loop=self.io_loop)
00198 subproc.set_exit_callback(self.stop)
00199 ret = self.wait()
00200 self.assertEqual(ret, 0)
00201 self.assertEqual(subproc.returncode, ret)
00202
00203 def test_sigchild_signal(self):
00204 skip_if_twisted()
00205 Subprocess.initialize(io_loop=self.io_loop)
00206 self.addCleanup(Subprocess.uninitialize)
00207 subproc = Subprocess([sys.executable, '-c',
00208 'import time; time.sleep(30)'],
00209 io_loop=self.io_loop)
00210 subproc.set_exit_callback(self.stop)
00211 os.kill(subproc.pid, signal.SIGTERM)
00212 ret = self.wait()
00213 self.assertEqual(subproc.returncode, ret)
00214 self.assertEqual(ret, -signal.SIGTERM)