00001
00002
00003 import logging
00004 import logging.handlers
00005 import os
00006 from twisted.internet import protocol, reactor
00007 from twisted.internet.defer import Deferred, inlineCallbacks
00008 import sys
00009 import command_with_output
00010
00011 class Quiet:
00012 pass
00013
00014 class AutoShutdown:
00015 pass
00016
00017 class System(protocol.ProcessProtocol):
00018 def __init__(self, *args):
00019 self.deferred = Deferred()
00020 self.quiet = False
00021 autoshutdown = False
00022 while True:
00023 if args[0] == Quiet:
00024 self.quiet = True
00025 elif args[0] == AutoShutdown:
00026 autoshutdown = True
00027 else:
00028 break
00029 args = args[1:]
00030 self.proc = None
00031 self.args = args
00032 self.shutdown_deferreds = []
00033 if autoshutdown:
00034 self.shutdown_trigger = reactor.addSystemEventTrigger('before', 'shutdown', self._shutdown)
00035 else:
00036 self.shutdown_trigger = None
00037 self.proc = reactor.spawnProcess(self, args[0], args, None)
00038
00039 def errReceived(self, data):
00040 if not self.quiet:
00041 print >> sys.stderr, data
00042
00043 def outReceived(self, data):
00044 if not self.quiet:
00045 print >> sys.stdout, data
00046
00047 def processEnded(self, status_object):
00048 if self.shutdown_trigger:
00049 reactor.removeSystemEventTrigger(self.shutdown_trigger)
00050 self.shutdown_trigger = None
00051 self.deferred.callback(status_object.value.exitCode)
00052 for d in self.shutdown_deferreds:
00053 d.callback(status_object.value.exitCode)
00054
00055 def _shutdown(self):
00056 """Called at system shutdown."""
00057 self.shutdown_trigger = None
00058 return self.shutdown()
00059
00060 def shutdown(self):
00061 """Call this to interrupt the program. Returns a deferred that will
00062 be called when shutdown is complete."""
00063 d = Deferred()
00064 self.shutdown_deferreds.append(d)
00065 if self.proc:
00066 print "Killing command:", self.args
00067 self.proc.signalProcess("INT")
00068 else:
00069 d.callback()
00070 return d
00071
00072 def system(*args):
00073 debug = False
00074 if debug:
00075 import time
00076 print time.time(), "system: ", args
00077
00078 s = System(*args)
00079 if debug:
00080 def printout(value):
00081 print time.time(), "system done: ", args
00082 return value
00083 s.deferred.addCallback(printout)
00084 return s.deferred
00085
00086 if __name__ == "__main__":
00087 import unittest
00088 from async_helpers import unittest_with_reactor, async_test
00089 import time
00090
00091 class SystemTest(unittest.TestCase):
00092 @async_test
00093 def test_basic(self):
00094 """Runs a simple command."""
00095 retval = yield system('echo', 'Hello')
00096 self.assertEqual(retval, 0)
00097
00098 @async_test
00099 def test_retval(self):
00100 """Checks that return values work."""
00101 retval = yield system('sh', '-c', 'exit 42')
00102 self.assertEqual(retval, 42)
00103
00104 @async_test
00105 def test_no_return_early(self):
00106 """Checks that System waits until termination."""
00107 start = time.time()
00108 retval = yield system('sleep', '1')
00109 duration = time.time() - start
00110 self.assertEqual(retval, 0)
00111 self.assertTrue(duration >= 1, "%s > 1 is false"%duration)
00112
00113 def run_ros_tests():
00114 import rostest
00115 rostest.unitrun('multi_interface_roam', 'system', SystemTest)
00116
00117 unittest_with_reactor(run_ros_tests)