$search
00001 #! /usr/bin/env python 00002 00003 from __future__ import with_statement 00004 00005 import event 00006 00007 # TODO 00008 # - Make CompositeStatePublisher so it can be garbage collected. 00009 00010 class StatePublisher(): 00011 def __init__(self, init_state): 00012 self._set_lock = event.ReentrantDetectingLock() 00013 self._state = init_state 00014 self._event = event.Event() 00015 00016 def get(self): 00017 return self._state 00018 00019 def _set_nolock(self, new_state): 00020 if self._state != new_state: 00021 old_state = self._state 00022 self._state = new_state 00023 self._event.trigger(old_state = old_state, new_state = new_state) 00024 self._setting_thread = None 00025 00026 def set(self, new_state): 00027 with self._set_lock("Tried to set state from state callback."): 00028 self._set_nolock(new_state) 00029 00030 def subscribe(self, *args, **kwargs): 00031 with self._set_lock("Tried to subscribe from state callback."): 00032 h = self._event.subscribe_repeating(*args, **kwargs) 00033 h._trigger((), {'old_state' : None, 'new_state' : self._state}) 00034 return h 00035 00036 class CompositeStatePublisher(StatePublisher): 00037 def __init__(self, func, state_publishers): 00038 StatePublisher.__init__(self, None) 00039 n = len(state_publishers) 00040 self._func = func 00041 self._states = n * [ None ] 00042 self._has_been_set = n * [ False ] 00043 for i in range(0, n): 00044 state_publishers[i].subscribe(self._cb, i) 00045 00046 def _cb(self, idx, old_state, new_state): 00047 if self._states[idx] != old_state: 00048 raise Exception("Unexpected state: %s != %s"%(self._states[idx], old_state)) 00049 self._states[idx] = new_state 00050 with self._set_lock: 00051 all_set = all(self._has_been_set) 00052 if not all_set: 00053 self._has_been_set[idx] = True 00054 if all(self._has_been_set): 00055 self._has_been_set = [] # Silly optimization 00056 all_set = True 00057 if all_set: 00058 self._set_nolock(self._func(self._states)) 00059 00060 def set(self, new_state): 00061 raise Exception("set method called on CompositeStatePublisher") 00062 00063 if __name__ == "__main__": 00064 import unittest 00065 import sys 00066 00067 def state_logger(l, old_state, new_state): 00068 l.append((old_state, new_state)) 00069 00070 class BasicTest(unittest.TestCase): 00071 def test_basic(self): 00072 s = StatePublisher(False) 00073 l = [] 00074 h1 = s.subscribe(state_logger, l) 00075 h2 = s.subscribe(state_logger, l) 00076 s.set(1) 00077 h1.unsubscribe() 00078 s.set(2) 00079 h2.unsubscribe() 00080 self.assertEqual(l, [ 00081 (None, False), 00082 (None, False), 00083 (False, 1), 00084 (False, 1), 00085 (1, 2), 00086 ]) 00087 00088 def test_composite(self): 00089 s1 = StatePublisher(0) 00090 s2 = StatePublisher(0) 00091 s = CompositeStatePublisher(sum, [s1, s2]) 00092 l = [] 00093 s.subscribe(state_logger, l) 00094 s1.set(1) 00095 s2.set(2) 00096 s1.set(5) 00097 self.assertEqual(l, [ (None, 0), (0, 1), (1, 3), (3, 7) ]) 00098 00099 if len(sys.argv) > 1 and sys.argv[1].startswith("--gtest_output="): 00100 import roslib; roslib.load_manifest('multi_interface_roam') 00101 import rostest 00102 rostest.unitrun('multi_interface_roam', 'state_publisher_basic', BasicTest) 00103 else: 00104 unittest.main()