00001
00002
00003 from __future__ import with_statement
00004
00005 import event
00006
00007
00008
00009
00010 class StatePublisher():
00011 def __init__(self, init_state):
00012 self._set_lock = event.ReentrantDetectingLock()
00013 self._state = init_state
00014 self._event = event.Event()
00015
00016 def get(self):
00017 return self._state
00018
00019 def _set_nolock(self, new_state):
00020 if self._state != new_state:
00021 old_state = self._state
00022 self._state = new_state
00023 self._event.trigger(old_state = old_state, new_state = new_state)
00024 self._setting_thread = None
00025
00026 def set(self, new_state):
00027 with self._set_lock("Tried to set state from state callback."):
00028 self._set_nolock(new_state)
00029
00030 def subscribe(self, *args, **kwargs):
00031 with self._set_lock("Tried to subscribe from state callback."):
00032 h = self._event.subscribe_repeating(*args, **kwargs)
00033 h._trigger((), {'old_state' : None, 'new_state' : self._state})
00034 return h
00035
00036 class CompositeStatePublisher(StatePublisher):
00037 def __init__(self, func, state_publishers):
00038 StatePublisher.__init__(self, None)
00039 n = len(state_publishers)
00040 self._func = func
00041 self._states = n * [ None ]
00042 self._has_been_set = n * [ False ]
00043 for i in range(0, n):
00044 state_publishers[i].subscribe(self._cb, i)
00045
00046 def _cb(self, idx, old_state, new_state):
00047 if self._states[idx] != old_state:
00048 raise Exception("Unexpected state: %s != %s"%(self._states[idx], old_state))
00049 self._states[idx] = new_state
00050 with self._set_lock:
00051 all_set = all(self._has_been_set)
00052 if not all_set:
00053 self._has_been_set[idx] = True
00054 if all(self._has_been_set):
00055 self._has_been_set = []
00056 all_set = True
00057 if all_set:
00058 self._set_nolock(self._func(self._states))
00059
00060 def set(self, new_state):
00061 raise Exception("set method called on CompositeStatePublisher")
00062
00063 if __name__ == "__main__":
00064 import unittest
00065 import sys
00066
00067 def state_logger(l, old_state, new_state):
00068 l.append((old_state, new_state))
00069
00070 class BasicTest(unittest.TestCase):
00071 def test_basic(self):
00072 s = StatePublisher(False)
00073 l = []
00074 h1 = s.subscribe(state_logger, l)
00075 h2 = s.subscribe(state_logger, l)
00076 s.set(1)
00077 h1.unsubscribe()
00078 s.set(2)
00079 h2.unsubscribe()
00080 self.assertEqual(l, [
00081 (None, False),
00082 (None, False),
00083 (False, 1),
00084 (False, 1),
00085 (1, 2),
00086 ])
00087
00088 def test_composite(self):
00089 s1 = StatePublisher(0)
00090 s2 = StatePublisher(0)
00091 s = CompositeStatePublisher(sum, [s1, s2])
00092 l = []
00093 s.subscribe(state_logger, l)
00094 s1.set(1)
00095 s2.set(2)
00096 s1.set(5)
00097 self.assertEqual(l, [ (None, 0), (0, 1), (1, 3), (3, 7) ])
00098
00099 if len(sys.argv) > 1 and sys.argv[1].startswith("--gtest_output="):
00100 import roslib; roslib.load_manifest('multi_interface_roam')
00101 import rostest
00102 rostest.unitrun('multi_interface_roam', 'state_publisher_basic', BasicTest)
00103 else:
00104 unittest.main()