00001
00002
00003 import logging
00004 import logging.handlers
00005 import os
00006 from twisted.internet import protocol, reactor
00007 import signal
00008
00009 logging_enabled = False
00010
00011
00012
00013 def sa_restart_hack():
00014 import twisted.internet.base
00015 old_handleSignals = twisted.internet.base._SignalReactorMixin._handleSignals
00016 def _handleSignals(self):
00017 old_handleSignals(self)
00018 signal.siginterrupt(signal.SIGCHLD, False)
00019 twisted.internet.base._SignalReactorMixin._handleSignals = _handleSignals
00020 sa_restart_hack()
00021
00022 class CommandWithOutput(protocol.ProcessProtocol):
00023 def __init__(self, args, name):
00024 self.restart_delay = 0.2
00025 self.logger = logging.getLogger(name)
00026 self.console_logger = logging.getLogger('console.%s'%name)
00027 if logging_enabled:
00028 logger_handler = logging.handlers.TimedRotatingFileHandler(os.path.join(logdir,'%s.log'%name), when='midnight', backupCount=logfilecount)
00029 logger_handler.setFormatter(file_formatter)
00030 self.logger.addHandler(logger_handler)
00031 self.console_logger.addHandler(logger_handler)
00032 logger_handler.setLevel(logging.DEBUG)
00033 self.logger.setLevel(logging.DEBUG)
00034 self.console_logger.setLevel(logging.DEBUG)
00035 self.proc_args = args
00036 self.outline = ""
00037 self.errline = ""
00038 self.shutting_down = False
00039 self.proc = None
00040 self.shutdown_trigger = reactor.addSystemEventTrigger('during', 'shutdown', self.shutdown)
00041 reactor.callWhenRunning(self.start_proc)
00042
00043 def errReceived(self, data):
00044 self.errline = self.data_received(self.errline, data)
00045
00046 def outReceived(self, data):
00047 self.outline = self.data_received(self.outline, data)
00048
00049 def data_received(self, curline, data):
00050 curline += data
00051 while True:
00052 splitpos = curline.find('\n')
00053 if splitpos == -1:
00054 break
00055 self.got_line(curline[:splitpos])
00056 curline = curline[splitpos+1:]
00057 return curline
00058
00059 def processEnded(self, status_object):
00060 if self.outline:
00061 self.got_line(self.outline)
00062 if self.errline:
00063 self.got_line(self.errline)
00064 if self.shutting_down:
00065 return
00066 self.console_logger.info("Process died, restarting: %s"%(" ".join(self.proc_args)))
00067 self.start_proc()
00068
00069 def start_proc(self):
00070 try:
00071 self.proc = reactor.spawnProcess(self, self.proc_args[0], self.proc_args, None)
00072 self.child_restart()
00073 except OSError:
00074 self.console_logger.fatal("Error trying to run: %s"%(" ".join(self.proc_args)))
00075
00076 def child_restart(self):
00077 pass
00078
00079 def _got_line(self, line):
00080 self.logger.info(line)
00081 try:
00082 self.got_line(line)
00083 except Exception, e:
00084 self.console_logger.fatal("Caught exception in CommandWithOutput.run: %s"%str(e))
00085 raise
00086
00087 def shutdown(self):
00088 self.shutting_down = True
00089 try:
00090 reactor.removeSystemEventTrigger(self.shutdown_trigger)
00091 except ValueError:
00092 pass
00093 if self.proc:
00094 self.proc.signalProcess("INT")
00095
00096 if __name__ == "__main__":
00097 import unittest
00098 from async_helpers import unittest_with_reactor, async_test
00099 from twisted.internet.defer import Deferred
00100
00101 class CommandWithOutputTest(unittest.TestCase):
00102 @async_test
00103 def test_basic(self):
00104 """Runs a hello command, and checks that Hello gets read, and
00105 that child_restart gets called."""
00106 class Tst(CommandWithOutput):
00107 def __init__(self):
00108 self.deferred = Deferred()
00109 self.lines = []
00110 self.starts = 0
00111 CommandWithOutput.__init__(self, ['echo', 'Hello'], "test")
00112
00113 def got_line(self, line):
00114 self.lines.append(line)
00115
00116 def child_restart(self):
00117 if self.starts < 2:
00118 self.starts += 1
00119 return
00120 self.shutdown()
00121 self.deferred.callback(self.lines)
00122
00123 lines = yield Tst().deferred
00124 self.assertEqual(lines, ['Hello', 'Hello'])
00125
00126 @async_test
00127 def test_kill(self):
00128 class Tst(CommandWithOutput):
00129 def __init__(self):
00130 self.deferred = Deferred()
00131 self.count = 0
00132 self.second_start = False
00133 CommandWithOutput.__init__(self, ['yes', 'yes'], "test")
00134
00135 def got_line(self, line):
00136 self.count += 1
00137 if self.count == 100:
00138 self.deferred.callback(self.count)
00139 self.shutdown()
00140
00141 count = yield Tst().deferred
00142 self.assertEqual(count, 100)
00143
00144 @async_test
00145 def test_restart(self):
00146 class Tst(CommandWithOutput):
00147 def __init__(self):
00148 self.deferred = Deferred()
00149 self.count = 0
00150 self.second_start = False
00151 self.restarts = 0
00152 CommandWithOutput.__init__(self, ['yes', 'yes'], "test")
00153
00154 def got_line(self, line):
00155 self.count += 1
00156 if self.count == 100:
00157 self.proc.signalProcess("INT")
00158 if self.restarts > 1 and not self.deferred.called:
00159 self.deferred.callback(self.restarts)
00160 self.shutdown()
00161
00162 def child_restart(self):
00163 self.restarts += 1
00164
00165 count = yield Tst().deferred
00166 self.assertEqual(count, 2)
00167
00168
00169 def run_ros_tests():
00170 import rostest
00171 rostest.unitrun('multi_interface_roam', 'command_with_output', CommandWithOutputTest)
00172
00173 unittest_with_reactor(run_ros_tests)