$search
00001 #! /usr/bin/env python 00002 00003 from __future__ import with_statement 00004 00005 import event 00006 import copy 00007 00008 # TODO 00009 # - Make CompositeStatePublisher so it can be garbage collected. 00010 # - Add test for Unsubscribe exception, in particular case whene it is 00011 # raised directly from subscribe. 00012 # - Add test for CompositeStatePublisher. 00013 # - Add test that mutable states get copied when stored into old_state. 00014 00015 class JustSubscribed: 00016 pass 00017 00018 class StatePublisher(): 00019 def __init__(self, init_state): 00020 self._state = init_state 00021 self._event = event.Event() 00022 00023 def get(self): 00024 return self._state 00025 00026 def set(self, new_state): 00027 if self._state != new_state: 00028 old_state = self._state 00029 self._state = copy.deepcopy(new_state) 00030 self._event.trigger(old_state = old_state, new_state = new_state) 00031 self._setting_thread = None 00032 00033 def subscribe(self, *args, **kwargs): 00034 f = args[0] 00035 args = args[1:] 00036 h = self._event.subscribe_repeating(f, *args, **kwargs) 00037 allkwargs = {'old_state' : JustSubscribed, 'new_state' : self._state} 00038 allkwargs.update(kwargs) 00039 try: 00040 f(*args, **allkwargs) 00041 except event.Unsubscribe: 00042 h.unsubscribe() 00043 return h 00044 00045 class CompositeStatePublisher(StatePublisher): 00046 def __init__(self, func, state_publishers): 00047 StatePublisher.__init__(self, None) 00048 n = len(state_publishers) 00049 self._func = func 00050 self._states = n * [ JustSubscribed ] 00051 self._has_been_set = n * [ False ] 00052 for i in range(0, n): 00053 state_publishers[i].subscribe(self._cb, i) 00054 00055 _set = StatePublisher.set 00056 00057 def _cb(self, idx, old_state, new_state): 00058 if self._states[idx] != old_state: 00059 raise Exception("Unexpected state: %s != %s"%(self._states[idx], old_state)) 00060 self._states[idx] = new_state 00061 all_set = all(self._has_been_set) 00062 if not all_set: 00063 self._has_been_set[idx] = True 00064 if all(self._has_been_set): 00065 self._has_been_set = [] # Silly optimization 00066 all_set = True 00067 if all_set: 00068 self._set(self._func(self._states)) 00069 00070 def set(self, new_state): 00071 raise Exception("set method called on CompositeStatePublisher") 00072 00073 if __name__ == "__main__": 00074 import unittest 00075 import sys 00076 00077 def state_logger(l, old_state, new_state): 00078 l.append((old_state, new_state)) 00079 00080 class BasicTest(unittest.TestCase): 00081 def test_basic(self): 00082 s = StatePublisher(False) 00083 l = [] 00084 h1 = s.subscribe(state_logger, l) 00085 h2 = s.subscribe(state_logger, l) 00086 s.set(1) 00087 h1.unsubscribe() 00088 s.set(2) 00089 h2.unsubscribe() 00090 self.assertEqual(l, [ 00091 (JustSubscribed, False), 00092 (JustSubscribed, False), 00093 (False, 1), 00094 (False, 1), 00095 (1, 2), 00096 ]) 00097 00098 def test_composite(self): 00099 s1 = StatePublisher(0) 00100 s2 = StatePublisher(0) 00101 s = CompositeStatePublisher(sum, [s1, s2]) 00102 l = [] 00103 s.subscribe(state_logger, l) 00104 s1.set(1) 00105 s2.set(2) 00106 s1.set(5) 00107 self.assertEqual(l, [ (JustSubscribed, 0), (0, 1), (1, 3), (3, 7) ]) 00108 00109 if len(sys.argv) > 1 and sys.argv[1].startswith("--gtest_output="): 00110 import roslib; roslib.load_manifest('multi_interface_roam') 00111 import rostest 00112 rostest.unitrun('multi_interface_roam', 'state_publisher_basic', BasicTest) 00113 else: 00114 unittest.main()