$search
00001 #! /usr/bin/env python 00002 00003 import logging 00004 import logging.handlers 00005 import os 00006 from twisted.internet import protocol, reactor 00007 from twisted.internet.defer import Deferred, inlineCallbacks 00008 import sys 00009 import command_with_output # There is a SIGCHLD hack in there that we want to run 00010 00011 class Quiet: 00012 pass 00013 00014 class AutoShutdown: 00015 pass 00016 00017 class System(protocol.ProcessProtocol): 00018 def __init__(self, *args): 00019 self.deferred = Deferred() 00020 self.quiet = False 00021 autoshutdown = False 00022 while True: 00023 if args[0] == Quiet: 00024 self.quiet = True 00025 elif args[0] == AutoShutdown: 00026 autoshutdown = True 00027 else: 00028 break 00029 args = args[1:] 00030 self.proc = None 00031 self.args = args 00032 self.shutdown_deferreds = [] 00033 if autoshutdown: 00034 self.shutdown_trigger = reactor.addSystemEventTrigger('before', 'shutdown', self._shutdown) 00035 else: 00036 self.shutdown_trigger = None 00037 self.proc = reactor.spawnProcess(self, args[0], args, None) 00038 00039 def errReceived(self, data): 00040 if not self.quiet: 00041 print >> sys.stderr, data 00042 00043 def outReceived(self, data): 00044 if not self.quiet: 00045 print >> sys.stdout, data 00046 00047 def processEnded(self, status_object): 00048 if self.shutdown_trigger: 00049 reactor.removeSystemEventTrigger(self.shutdown_trigger) 00050 self.shutdown_trigger = None 00051 self.deferred.callback(status_object.value.exitCode) 00052 for d in self.shutdown_deferreds: 00053 d.callback(status_object.value.exitCode) 00054 00055 def _shutdown(self): 00056 """Called at system shutdown.""" 00057 self.shutdown_trigger = None 00058 return self.shutdown() 00059 00060 def shutdown(self): 00061 """Call this to interrupt the program. Returns a deferred that will 00062 be called when shutdown is complete.""" 00063 d = Deferred() 00064 self.shutdown_deferreds.append(d) 00065 if self.proc: 00066 print "Killing command:", self.args 00067 self.proc.signalProcess("INT") 00068 else: 00069 d.callback() 00070 return d 00071 00072 def system(*args): 00073 debug = False 00074 if debug: 00075 import time 00076 print time.time(), "system: ", args 00077 #print "system: ", args 00078 s = System(*args) 00079 if debug: 00080 def printout(value): 00081 print time.time(), "system done: ", args 00082 return value 00083 s.deferred.addCallback(printout) 00084 return s.deferred 00085 00086 if __name__ == "__main__": 00087 import unittest 00088 from async_helpers import unittest_with_reactor, async_test 00089 import time 00090 00091 class SystemTest(unittest.TestCase): 00092 @async_test 00093 def test_basic(self): 00094 """Runs a simple command.""" 00095 retval = yield system('echo', 'Hello') 00096 self.assertEqual(retval, 0) 00097 00098 @async_test 00099 def test_retval(self): 00100 """Checks that return values work.""" 00101 retval = yield system('sh', '-c', 'exit 42') 00102 self.assertEqual(retval, 42) 00103 00104 @async_test 00105 def test_no_return_early(self): 00106 """Checks that System waits until termination.""" 00107 start = time.time() 00108 retval = yield system('sleep', '1') 00109 duration = time.time() - start 00110 self.assertEqual(retval, 0) 00111 self.assertTrue(duration >= 1, "%s > 1 is false"%duration) 00112 00113 def run_ros_tests(): 00114 import rostest 00115 rostest.unitrun('multi_interface_roam', 'system', SystemTest) 00116 00117 unittest_with_reactor(run_ros_tests)