system.py
Go to the documentation of this file.
00001 #! /usr/bin/env python
00002 
00003 import logging
00004 import logging.handlers
00005 import os
00006 from twisted.internet import protocol, reactor
00007 from twisted.internet.defer import Deferred, inlineCallbacks
00008 import sys
00009 import command_with_output # There is a SIGCHLD hack in there that we want to run
00010 
00011 class Quiet:
00012     pass
00013 
00014 class AutoShutdown:
00015     pass
00016 
00017 class System(protocol.ProcessProtocol):
00018     def __init__(self, *args):
00019         self.deferred = Deferred()
00020         self.quiet = False
00021         autoshutdown = False
00022         while True:
00023             if args[0] == Quiet:
00024                 self.quiet = True
00025             elif args[0] == AutoShutdown:
00026                 autoshutdown = True
00027             else:
00028                 break
00029             args = args[1:]
00030         self.proc = None
00031         self.args = args
00032         self.shutdown_deferreds = []
00033         if autoshutdown:
00034             self.shutdown_trigger = reactor.addSystemEventTrigger('before', 'shutdown', self._shutdown)
00035         else:
00036             self.shutdown_trigger = None
00037         self.proc = reactor.spawnProcess(self, args[0], args, None)
00038     
00039     def errReceived(self, data):
00040         if not self.quiet:
00041             print >> sys.stderr, data
00042 
00043     def outReceived(self, data):
00044         if not self.quiet:
00045             print >> sys.stdout, data
00046 
00047     def processEnded(self, status_object):
00048         if self.shutdown_trigger:
00049             reactor.removeSystemEventTrigger(self.shutdown_trigger)
00050             self.shutdown_trigger = None
00051         self.deferred.callback(status_object.value.exitCode)
00052         for d in self.shutdown_deferreds:
00053             d.callback(status_object.value.exitCode)
00054 
00055     def _shutdown(self):
00056         """Called at system shutdown."""
00057         self.shutdown_trigger = None
00058         return self.shutdown()
00059 
00060     def shutdown(self):
00061         """Call this to interrupt the program. Returns a deferred that will
00062         be called when shutdown is complete."""
00063         d = Deferred()
00064         self.shutdown_deferreds.append(d)
00065         if self.proc:
00066             print "Killing command:", self.args
00067             self.proc.signalProcess("INT")
00068         else:
00069             d.callback()
00070         return d
00071 
00072 def system(*args):
00073     debug = False
00074     if debug:
00075         import time
00076         print time.time(), "system: ", args
00077     #print "system: ", args
00078     s = System(*args)
00079     if debug:
00080         def printout(value):
00081             print time.time(), "system done: ", args
00082             return value
00083         s.deferred.addCallback(printout)
00084     return s.deferred
00085 
00086 if __name__ == "__main__":
00087     import unittest
00088     from async_helpers import unittest_with_reactor, async_test
00089     import time
00090 
00091     class SystemTest(unittest.TestCase):
00092         @async_test
00093         def test_basic(self):
00094             """Runs a simple command."""
00095             retval = yield system('echo', 'Hello')
00096             self.assertEqual(retval, 0)
00097 
00098         @async_test
00099         def test_retval(self):
00100             """Checks that return values work."""
00101             retval = yield system('sh', '-c', 'exit 42')
00102             self.assertEqual(retval, 42)
00103 
00104         @async_test
00105         def test_no_return_early(self):
00106             """Checks that System waits until termination."""
00107             start = time.time()
00108             retval = yield system('sleep', '1')
00109             duration = time.time() - start
00110             self.assertEqual(retval, 0)
00111             self.assertTrue(duration >= 1, "%s > 1 is false"%duration)
00112 
00113     def run_ros_tests():
00114         import rostest
00115         rostest.unitrun('multi_interface_roam', 'system', SystemTest)
00116     
00117     unittest_with_reactor(run_ros_tests)


multi_interface_roam
Author(s): Blaise Gassend
autogenerated on Thu Apr 24 2014 15:34:18