Go to the documentation of this file.00001
00002
00003 import logging
00004 import logging.handlers
00005 import os
00006 from twisted.internet import protocol, reactor
00007 from twisted.internet.defer import Deferred, inlineCallbacks
00008 import sys
00009 import command_with_output
00010
00011 class Quiet:
00012 pass
00013
00014 class AutoShutdown:
00015 pass
00016
00017 class System(protocol.ProcessProtocol):
00018 def __init__(self, *args):
00019 self.deferred = Deferred()
00020 self.quiet = False
00021 autoshutdown = False
00022 while True:
00023 if args[0] == Quiet:
00024 self.quiet = True
00025 elif args[0] == AutoShutdown:
00026 autoshutdown = True
00027 else:
00028 break
00029 args = args[1:]
00030 self.proc = None
00031 self.args = args
00032 self.shutdown_deferreds = []
00033 if autoshutdown:
00034 self.shutdown_trigger = reactor.addSystemEventTrigger('before', 'shutdown', self._shutdown)
00035 else:
00036 self.shutdown_trigger = None
00037 self.proc = reactor.spawnProcess(self, args[0], args, None)
00038
00039 def errReceived(self, data):
00040 if not self.quiet:
00041 print >> sys.stderr, data
00042
00043 def outReceived(self, data):
00044 if not self.quiet:
00045 print >> sys.stdout, data
00046
00047 def processEnded(self, status_object):
00048 if self.shutdown_trigger:
00049 reactor.removeSystemEventTrigger(self.shutdown_trigger)
00050 self.shutdown_trigger = None
00051 self.deferred.callback(status_object.value.exitCode)
00052 for d in self.shutdown_deferreds:
00053 d.callback(status_object.value.exitCode)
00054
00055 def _shutdown(self):
00056 """Called at system shutdown."""
00057 self.shutdown_trigger = None
00058 return self.shutdown()
00059
00060 def shutdown(self):
00061 """Call this to interrupt the program. Returns a deferred that will
00062 be called when shutdown is complete."""
00063 d = Deferred()
00064 self.shutdown_deferreds.append(d)
00065 if self.proc:
00066 print "Killing command:", self.args
00067 self.proc.signalProcess("INT")
00068 else:
00069 d.callback()
00070 return d
00071
00072 def system(*args):
00073 debug = False
00074 if debug:
00075 import time
00076 print time.time(), "system: ", args
00077
00078 s = System(*args)
00079 if debug:
00080 def printout(value):
00081 print time.time(), "system done: ", args
00082 return value
00083 s.deferred.addCallback(printout)
00084 return s.deferred
00085
00086 if __name__ == "__main__":
00087 import unittest
00088 from async_helpers import unittest_with_reactor, async_test
00089 import time
00090
00091 class SystemTest(unittest.TestCase):
00092 @async_test
00093 def test_basic(self):
00094 """Runs a simple command."""
00095 retval = yield system('echo', 'Hello')
00096 self.assertEqual(retval, 0)
00097
00098 @async_test
00099 def test_retval(self):
00100 """Checks that return values work."""
00101 retval = yield system('sh', '-c', 'exit 42')
00102 self.assertEqual(retval, 42)
00103
00104 @async_test
00105 def test_no_return_early(self):
00106 """Checks that System waits until termination."""
00107 start = time.time()
00108 retval = yield system('sleep', '1')
00109 duration = time.time() - start
00110 self.assertEqual(retval, 0)
00111 self.assertTrue(duration >= 1, "%s > 1 is false"%duration)
00112
00113 def run_ros_tests():
00114 import rostest
00115 rostest.unitrun('multi_interface_roam', 'system', SystemTest)
00116
00117 unittest_with_reactor(run_ros_tests)