$search
00001 #! /usr/bin/env python 00002 00003 import logging 00004 import logging.handlers 00005 import os 00006 from twisted.internet import protocol, reactor 00007 import signal 00008 00009 logging_enabled = False 00010 00011 # This is a workaround for older versions of twisted that use SIGCHLD, but 00012 # do not set SA_RESTART. 00013 def sa_restart_hack(): 00014 import twisted.internet.base 00015 old_handleSignals = twisted.internet.base._SignalReactorMixin._handleSignals 00016 def _handleSignals(self): 00017 old_handleSignals(self) 00018 signal.siginterrupt(signal.SIGCHLD, False) 00019 twisted.internet.base._SignalReactorMixin._handleSignals = _handleSignals 00020 sa_restart_hack() 00021 00022 class CommandWithOutput(protocol.ProcessProtocol): 00023 def __init__(self, args, name): 00024 self.restart_delay = 0.2 00025 self.logger = logging.getLogger(name) 00026 self.console_logger = logging.getLogger('console.%s'%name) 00027 if logging_enabled: 00028 logger_handler = logging.handlers.TimedRotatingFileHandler(os.path.join(logdir,'%s.log'%name), when='midnight', backupCount=logfilecount) 00029 logger_handler.setFormatter(file_formatter) 00030 self.logger.addHandler(logger_handler) 00031 self.console_logger.addHandler(logger_handler) 00032 logger_handler.setLevel(logging.DEBUG) 00033 self.logger.setLevel(logging.DEBUG) 00034 self.console_logger.setLevel(logging.DEBUG) 00035 self.proc_args = args 00036 self.outline = "" 00037 self.errline = "" 00038 self.shutting_down = False 00039 self.proc = None 00040 self.shutdown_trigger = reactor.addSystemEventTrigger('during', 'shutdown', self.shutdown) 00041 reactor.callWhenRunning(self.start_proc) 00042 00043 def errReceived(self, data): 00044 self.errline = self.data_received(self.errline, data) 00045 00046 def outReceived(self, data): 00047 self.outline = self.data_received(self.outline, data) 00048 00049 def data_received(self, curline, data): 00050 curline += data 00051 while True: 00052 splitpos = curline.find('\n') 00053 if splitpos == -1: 00054 break 00055 self.got_line(curline[:splitpos]) 00056 curline = curline[splitpos+1:] 00057 return curline 00058 00059 def processEnded(self, status_object): 00060 if self.outline: 00061 self.got_line(self.outline) 00062 if self.errline: 00063 self.got_line(self.errline) 00064 if self.shutting_down: 00065 return 00066 self.console_logger.info("Process died, restarting: %s"%(" ".join(self.proc_args))) 00067 self.start_proc() 00068 00069 def start_proc(self): 00070 try: 00071 self.proc = reactor.spawnProcess(self, self.proc_args[0], self.proc_args, None) 00072 self.child_restart() 00073 except OSError: 00074 self.console_logger.fatal("Error trying to run: %s"%(" ".join(self.proc_args))) 00075 00076 def child_restart(self): 00077 pass # Can be overridden by derived classes. 00078 00079 def _got_line(self, line): 00080 self.logger.info(line) 00081 try: 00082 self.got_line(line) 00083 except Exception, e: 00084 self.console_logger.fatal("Caught exception in CommandWithOutput.run: %s"%str(e)) 00085 raise # FIXME Remove this? 00086 00087 def shutdown(self): 00088 self.shutting_down = True 00089 try: 00090 reactor.removeSystemEventTrigger(self.shutdown_trigger) 00091 except ValueError: 00092 pass # We may have been called automatically at shutdown. 00093 if self.proc: 00094 self.proc.signalProcess("INT") 00095 00096 if __name__ == "__main__": 00097 import unittest 00098 from async_helpers import unittest_with_reactor, async_test 00099 from twisted.internet.defer import Deferred 00100 00101 class CommandWithOutputTest(unittest.TestCase): 00102 @async_test 00103 def test_basic(self): 00104 """Runs a hello command, and checks that Hello gets read, and 00105 that child_restart gets called.""" 00106 class Tst(CommandWithOutput): 00107 def __init__(self): 00108 self.deferred = Deferred() 00109 self.lines = [] 00110 self.starts = 0 00111 CommandWithOutput.__init__(self, ['echo', 'Hello'], "test") 00112 00113 def got_line(self, line): 00114 self.lines.append(line) 00115 00116 def child_restart(self): 00117 if self.starts < 2: 00118 self.starts += 1 00119 return 00120 self.shutdown() 00121 self.deferred.callback(self.lines) 00122 00123 lines = yield Tst().deferred 00124 self.assertEqual(lines, ['Hello', 'Hello']) 00125 00126 @async_test 00127 def test_kill(self): 00128 class Tst(CommandWithOutput): 00129 def __init__(self): 00130 self.deferred = Deferred() 00131 self.count = 0 00132 self.second_start = False 00133 CommandWithOutput.__init__(self, ['yes', 'yes'], "test") 00134 00135 def got_line(self, line): 00136 self.count += 1 00137 if self.count == 100: 00138 self.deferred.callback(self.count) 00139 self.shutdown() 00140 00141 count = yield Tst().deferred 00142 self.assertEqual(count, 100) 00143 00144 @async_test 00145 def test_restart(self): 00146 class Tst(CommandWithOutput): 00147 def __init__(self): 00148 self.deferred = Deferred() 00149 self.count = 0 00150 self.second_start = False 00151 self.restarts = 0 00152 CommandWithOutput.__init__(self, ['yes', 'yes'], "test") 00153 00154 def got_line(self, line): 00155 self.count += 1 00156 if self.count == 100: 00157 self.proc.signalProcess("INT") 00158 if self.restarts > 1 and not self.deferred.called: 00159 self.deferred.callback(self.restarts) 00160 self.shutdown() 00161 00162 def child_restart(self): 00163 self.restarts += 1 00164 00165 count = yield Tst().deferred 00166 self.assertEqual(count, 2) 00167 00168 00169 def run_ros_tests(): 00170 import rostest 00171 rostest.unitrun('multi_interface_roam', 'command_with_output', CommandWithOutputTest) 00172 00173 unittest_with_reactor(run_ros_tests)