command_with_output.py
Go to the documentation of this file.
00001 #! /usr/bin/env python
00002 
00003 import logging
00004 import logging.handlers
00005 import os
00006 from twisted.internet import protocol, reactor
00007 import signal
00008 
00009 logging_enabled = False
00010 
00011 # This is a workaround for older versions of twisted that use SIGCHLD, but
00012 # do not set SA_RESTART.
00013 def sa_restart_hack():
00014     import twisted.internet.base
00015     old_handleSignals = twisted.internet.base._SignalReactorMixin._handleSignals
00016     def _handleSignals(self):
00017         old_handleSignals(self)
00018         signal.siginterrupt(signal.SIGCHLD, False)
00019     twisted.internet.base._SignalReactorMixin._handleSignals = _handleSignals
00020 sa_restart_hack()
00021 
00022 class CommandWithOutput(protocol.ProcessProtocol):
00023     def __init__(self, args, name):
00024         self.restart_delay = 0.2
00025         self.logger = logging.getLogger(name)
00026         self.console_logger = logging.getLogger('console.%s'%name)
00027         if logging_enabled:
00028             logger_handler = logging.handlers.TimedRotatingFileHandler(os.path.join(logdir,'%s.log'%name), when='midnight', backupCount=logfilecount)
00029             logger_handler.setFormatter(file_formatter)
00030             self.logger.addHandler(logger_handler)
00031             self.console_logger.addHandler(logger_handler)
00032             logger_handler.setLevel(logging.DEBUG)
00033         self.logger.setLevel(logging.DEBUG)
00034         self.console_logger.setLevel(logging.DEBUG)
00035         self.proc_args = args
00036         self.outline = ""
00037         self.errline = ""
00038         self.shutting_down = False
00039         self.proc = None
00040         self.shutdown_trigger = reactor.addSystemEventTrigger('during', 'shutdown', self.shutdown)
00041         reactor.callWhenRunning(self.start_proc)
00042                             
00043     def errReceived(self, data):
00044         self.errline = self.data_received(self.errline, data)
00045 
00046     def outReceived(self, data):
00047         self.outline = self.data_received(self.outline, data)
00048 
00049     def data_received(self, curline, data):
00050         curline += data
00051         while True:
00052             splitpos = curline.find('\n')
00053             if splitpos == -1:
00054                 break
00055             self.got_line(curline[:splitpos])
00056             curline = curline[splitpos+1:]
00057         return curline
00058 
00059     def processEnded(self, status_object):
00060         if self.outline: 
00061             self.got_line(self.outline)
00062         if self.errline:
00063             self.got_line(self.errline)
00064         if self.shutting_down:
00065             return
00066         self.console_logger.info("Process died, restarting: %s"%(" ".join(self.proc_args)))
00067         self.start_proc()
00068     
00069     def start_proc(self):
00070         try:
00071             self.proc = reactor.spawnProcess(self, self.proc_args[0], self.proc_args, None)
00072             self.child_restart()
00073         except OSError:
00074             self.console_logger.fatal("Error trying to run: %s"%(" ".join(self.proc_args)))
00075 
00076     def child_restart(self):
00077         pass # Can be overridden by derived classes.
00078 
00079     def _got_line(self, line):
00080         self.logger.info(line)
00081         try:
00082             self.got_line(line)
00083         except Exception, e:
00084             self.console_logger.fatal("Caught exception in CommandWithOutput.run: %s"%str(e))
00085             raise # FIXME Remove this?
00086 
00087     def shutdown(self):
00088         self.shutting_down = True
00089         try:
00090             reactor.removeSystemEventTrigger(self.shutdown_trigger)
00091         except ValueError:
00092             pass # We may have been called automatically at shutdown.
00093         if self.proc:
00094             self.proc.signalProcess("INT")
00095 
00096 if __name__ == "__main__":
00097     import unittest
00098     from async_helpers import unittest_with_reactor, async_test
00099     from twisted.internet.defer import Deferred
00100 
00101     class CommandWithOutputTest(unittest.TestCase):
00102         @async_test
00103         def test_basic(self):
00104             """Runs a hello command, and checks that Hello gets read, and
00105             that child_restart gets called."""
00106             class Tst(CommandWithOutput):
00107                 def __init__(self):
00108                     self.deferred = Deferred()
00109                     self.lines = []
00110                     self.starts = 0
00111                     CommandWithOutput.__init__(self, ['echo', 'Hello'], "test")
00112 
00113                 def got_line(self, line):
00114                     self.lines.append(line)
00115            
00116                 def child_restart(self):
00117                     if self.starts < 2:
00118                         self.starts += 1
00119                         return
00120                     self.shutdown()
00121                     self.deferred.callback(self.lines)
00122 
00123             lines = yield Tst().deferred
00124             self.assertEqual(lines, ['Hello', 'Hello'])
00125 
00126         @async_test
00127         def test_kill(self):
00128             class Tst(CommandWithOutput):
00129                 def __init__(self):
00130                     self.deferred = Deferred()
00131                     self.count = 0
00132                     self.second_start = False
00133                     CommandWithOutput.__init__(self, ['yes', 'yes'], "test")
00134 
00135                 def got_line(self, line):
00136                     self.count += 1
00137                     if self.count == 100:
00138                         self.deferred.callback(self.count)
00139                         self.shutdown()
00140 
00141             count = yield Tst().deferred
00142             self.assertEqual(count, 100)
00143 
00144         @async_test
00145         def test_restart(self):
00146             class Tst(CommandWithOutput):
00147                 def __init__(self):
00148                     self.deferred = Deferred()
00149                     self.count = 0
00150                     self.second_start = False
00151                     self.restarts = 0
00152                     CommandWithOutput.__init__(self, ['yes', 'yes'], "test")
00153 
00154                 def got_line(self, line):
00155                     self.count += 1
00156                     if self.count == 100:
00157                         self.proc.signalProcess("INT")
00158                     if self.restarts > 1 and not self.deferred.called:
00159                         self.deferred.callback(self.restarts)
00160                         self.shutdown()
00161                 
00162                 def child_restart(self):
00163                     self.restarts += 1
00164 
00165             count = yield Tst().deferred
00166             self.assertEqual(count, 2)
00167         
00168 
00169     def run_ros_tests():
00170         import rostest
00171         rostest.unitrun('multi_interface_roam', 'command_with_output', CommandWithOutputTest)
00172     
00173     unittest_with_reactor(run_ros_tests)


multi_interface_roam
Author(s): Blaise Gassend
autogenerated on Thu Jan 2 2014 11:26:15