process_test.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 
00003 
00004 from __future__ import absolute_import, division, print_function, with_statement
00005 import logging
00006 import os
00007 import signal
00008 import subprocess
00009 import sys
00010 from tornado.httpclient import HTTPClient, HTTPError
00011 from tornado.httpserver import HTTPServer
00012 from tornado.ioloop import IOLoop
00013 from tornado.log import gen_log
00014 from tornado.process import fork_processes, task_id, Subprocess
00015 from tornado.simple_httpclient import SimpleAsyncHTTPClient
00016 from tornado.testing import bind_unused_port, ExpectLog, AsyncTestCase
00017 from tornado.test.util import unittest, skipIfNonUnix
00018 from tornado.web import RequestHandler, Application
00019 
00020 
00021 def skip_if_twisted():
00022     if IOLoop.configured_class().__name__.endswith(('TwistedIOLoop',
00023                                                     'AsyncIOMainLoop')):
00024         raise unittest.SkipTest("Process tests not compatible with "
00025                                 "TwistedIOLoop or AsyncIOMainLoop")
00026 
00027 # Not using AsyncHTTPTestCase because we need control over the IOLoop.
00028 
00029 
00030 @skipIfNonUnix
00031 class ProcessTest(unittest.TestCase):
00032     def get_app(self):
00033         class ProcessHandler(RequestHandler):
00034             def get(self):
00035                 if self.get_argument("exit", None):
00036                     # must use os._exit instead of sys.exit so unittest's
00037                     # exception handler doesn't catch it
00038                     os._exit(int(self.get_argument("exit")))
00039                 if self.get_argument("signal", None):
00040                     os.kill(os.getpid(),
00041                             int(self.get_argument("signal")))
00042                 self.write(str(os.getpid()))
00043         return Application([("/", ProcessHandler)])
00044 
00045     def tearDown(self):
00046         if task_id() is not None:
00047             # We're in a child process, and probably got to this point
00048             # via an uncaught exception.  If we return now, both
00049             # processes will continue with the rest of the test suite.
00050             # Exit now so the parent process will restart the child
00051             # (since we don't have a clean way to signal failure to
00052             # the parent that won't restart)
00053             logging.error("aborting child process from tearDown")
00054             logging.shutdown()
00055             os._exit(1)
00056         # In the surviving process, clear the alarm we set earlier
00057         signal.alarm(0)
00058         super(ProcessTest, self).tearDown()
00059 
00060     def test_multi_process(self):
00061         # This test can't work on twisted because we use the global reactor
00062         # and have no way to get it back into a sane state after the fork.
00063         skip_if_twisted()
00064         with ExpectLog(gen_log, "(Starting .* processes|child .* exited|uncaught exception)"):
00065             self.assertFalse(IOLoop.initialized())
00066             sock, port = bind_unused_port()
00067 
00068             def get_url(path):
00069                 return "http://127.0.0.1:%d%s" % (port, path)
00070             # ensure that none of these processes live too long
00071             signal.alarm(5)  # master process
00072             try:
00073                 id = fork_processes(3, max_restarts=3)
00074                 self.assertTrue(id is not None)
00075                 signal.alarm(5)  # child processes
00076             except SystemExit as e:
00077                 # if we exit cleanly from fork_processes, all the child processes
00078                 # finished with status 0
00079                 self.assertEqual(e.code, 0)
00080                 self.assertTrue(task_id() is None)
00081                 sock.close()
00082                 return
00083             try:
00084                 if id in (0, 1):
00085                     self.assertEqual(id, task_id())
00086                     server = HTTPServer(self.get_app())
00087                     server.add_sockets([sock])
00088                     IOLoop.instance().start()
00089                 elif id == 2:
00090                     self.assertEqual(id, task_id())
00091                     sock.close()
00092                     # Always use SimpleAsyncHTTPClient here; the curl
00093                     # version appears to get confused sometimes if the
00094                     # connection gets closed before it's had a chance to
00095                     # switch from writing mode to reading mode.
00096                     client = HTTPClient(SimpleAsyncHTTPClient)
00097 
00098                     def fetch(url, fail_ok=False):
00099                         try:
00100                             return client.fetch(get_url(url))
00101                         except HTTPError as e:
00102                             if not (fail_ok and e.code == 599):
00103                                 raise
00104 
00105                     # Make two processes exit abnormally
00106                     fetch("/?exit=2", fail_ok=True)
00107                     fetch("/?exit=3", fail_ok=True)
00108 
00109                     # They've been restarted, so a new fetch will work
00110                     int(fetch("/").body)
00111 
00112                     # Now the same with signals
00113                     # Disabled because on the mac a process dying with a signal
00114                     # can trigger an "Application exited abnormally; send error
00115                     # report to Apple?" prompt.
00116                     # fetch("/?signal=%d" % signal.SIGTERM, fail_ok=True)
00117                     # fetch("/?signal=%d" % signal.SIGABRT, fail_ok=True)
00118                     # int(fetch("/").body)
00119 
00120                     # Now kill them normally so they won't be restarted
00121                     fetch("/?exit=0", fail_ok=True)
00122                     # One process left; watch it's pid change
00123                     pid = int(fetch("/").body)
00124                     fetch("/?exit=4", fail_ok=True)
00125                     pid2 = int(fetch("/").body)
00126                     self.assertNotEqual(pid, pid2)
00127 
00128                     # Kill the last one so we shut down cleanly
00129                     fetch("/?exit=0", fail_ok=True)
00130 
00131                     os._exit(0)
00132             except Exception:
00133                 logging.error("exception in child process %d", id, exc_info=True)
00134                 raise
00135 
00136 
00137 @skipIfNonUnix
00138 class SubprocessTest(AsyncTestCase):
00139     def test_subprocess(self):
00140         if IOLoop.configured_class().__name__.endswith('LayeredTwistedIOLoop'):
00141             # This test fails non-deterministically with LayeredTwistedIOLoop.
00142             # (the read_until('\n') returns '\n' instead of 'hello\n')
00143             # This probably indicates a problem with either TornadoReactor
00144             # or TwistedIOLoop, but I haven't been able to track it down
00145             # and for now this is just causing spurious travis-ci failures.
00146             raise unittest.SkipTest("Subprocess tests not compatible with "
00147                                     "LayeredTwistedIOLoop")
00148         subproc = Subprocess([sys.executable, '-u', '-i'],
00149                              stdin=Subprocess.STREAM,
00150                              stdout=Subprocess.STREAM, stderr=subprocess.STDOUT,
00151                              io_loop=self.io_loop)
00152         self.addCleanup(lambda: os.kill(subproc.pid, signal.SIGTERM))
00153         subproc.stdout.read_until(b'>>> ', self.stop)
00154         self.wait()
00155         subproc.stdin.write(b"print('hello')\n")
00156         subproc.stdout.read_until(b'\n', self.stop)
00157         data = self.wait()
00158         self.assertEqual(data, b"hello\n")
00159 
00160         subproc.stdout.read_until(b">>> ", self.stop)
00161         self.wait()
00162         subproc.stdin.write(b"raise SystemExit\n")
00163         subproc.stdout.read_until_close(self.stop)
00164         data = self.wait()
00165         self.assertEqual(data, b"")
00166 
00167     def test_close_stdin(self):
00168         # Close the parent's stdin handle and see that the child recognizes it.
00169         subproc = Subprocess([sys.executable, '-u', '-i'],
00170                              stdin=Subprocess.STREAM,
00171                              stdout=Subprocess.STREAM, stderr=subprocess.STDOUT,
00172                              io_loop=self.io_loop)
00173         self.addCleanup(lambda: os.kill(subproc.pid, signal.SIGTERM))
00174         subproc.stdout.read_until(b'>>> ', self.stop)
00175         self.wait()
00176         subproc.stdin.close()
00177         subproc.stdout.read_until_close(self.stop)
00178         data = self.wait()
00179         self.assertEqual(data, b"\n")
00180 
00181     def test_stderr(self):
00182         subproc = Subprocess([sys.executable, '-u', '-c',
00183                               r"import sys; sys.stderr.write('hello\n')"],
00184                              stderr=Subprocess.STREAM,
00185                              io_loop=self.io_loop)
00186         self.addCleanup(lambda: os.kill(subproc.pid, signal.SIGTERM))
00187         subproc.stderr.read_until(b'\n', self.stop)
00188         data = self.wait()
00189         self.assertEqual(data, b'hello\n')
00190 
00191     def test_sigchild(self):
00192         # Twisted's SIGCHLD handler and Subprocess's conflict with each other.
00193         skip_if_twisted()
00194         Subprocess.initialize(io_loop=self.io_loop)
00195         self.addCleanup(Subprocess.uninitialize)
00196         subproc = Subprocess([sys.executable, '-c', 'pass'],
00197                              io_loop=self.io_loop)
00198         subproc.set_exit_callback(self.stop)
00199         ret = self.wait()
00200         self.assertEqual(ret, 0)
00201         self.assertEqual(subproc.returncode, ret)
00202 
00203     def test_sigchild_signal(self):
00204         skip_if_twisted()
00205         Subprocess.initialize(io_loop=self.io_loop)
00206         self.addCleanup(Subprocess.uninitialize)
00207         subproc = Subprocess([sys.executable, '-c',
00208                               'import time; time.sleep(30)'],
00209                              io_loop=self.io_loop)
00210         subproc.set_exit_callback(self.stop)
00211         os.kill(subproc.pid, signal.SIGTERM)
00212         ret = self.wait()
00213         self.assertEqual(subproc.returncode, ret)
00214         self.assertEqual(ret, -signal.SIGTERM)


rosbridge_tools
Author(s): Jonathan Mace
autogenerated on Sat Dec 27 2014 11:25:59