00001
00002
00003 from __future__ import with_statement
00004
00005 import event
00006 import copy
00007
00008
00009
00010
00011
00012
00013
00014
00015 class JustSubscribed:
00016 pass
00017
00018 class StatePublisher():
00019 def __init__(self, init_state):
00020 self._state = init_state
00021 self._event = event.Event()
00022
00023 def get(self):
00024 return self._state
00025
00026 def set(self, new_state):
00027 if self._state != new_state:
00028 old_state = self._state
00029 self._state = copy.deepcopy(new_state)
00030 self._event.trigger(old_state = old_state, new_state = new_state)
00031 self._setting_thread = None
00032
00033 def subscribe(self, *args, **kwargs):
00034 f = args[0]
00035 args = args[1:]
00036 h = self._event.subscribe_repeating(f, *args, **kwargs)
00037 allkwargs = {'old_state' : JustSubscribed, 'new_state' : self._state}
00038 allkwargs.update(kwargs)
00039 try:
00040 f(*args, **allkwargs)
00041 except event.Unsubscribe:
00042 h.unsubscribe()
00043 return h
00044
00045 class CompositeStatePublisher(StatePublisher):
00046 def __init__(self, func, state_publishers):
00047 StatePublisher.__init__(self, None)
00048 n = len(state_publishers)
00049 self._func = func
00050 self._states = n * [ JustSubscribed ]
00051 self._has_been_set = n * [ False ]
00052 for i in range(0, n):
00053 state_publishers[i].subscribe(self._cb, i)
00054
00055 _set = StatePublisher.set
00056
00057 def _cb(self, idx, old_state, new_state):
00058 if self._states[idx] != old_state:
00059 raise Exception("Unexpected state: %s != %s"%(self._states[idx], old_state))
00060 self._states[idx] = new_state
00061 all_set = all(self._has_been_set)
00062 if not all_set:
00063 self._has_been_set[idx] = True
00064 if all(self._has_been_set):
00065 self._has_been_set = []
00066 all_set = True
00067 if all_set:
00068 self._set(self._func(self._states))
00069
00070 def set(self, new_state):
00071 raise Exception("set method called on CompositeStatePublisher")
00072
00073 if __name__ == "__main__":
00074 import unittest
00075 import sys
00076
00077 def state_logger(l, old_state, new_state):
00078 l.append((old_state, new_state))
00079
00080 class BasicTest(unittest.TestCase):
00081 def test_basic(self):
00082 s = StatePublisher(False)
00083 l = []
00084 h1 = s.subscribe(state_logger, l)
00085 h2 = s.subscribe(state_logger, l)
00086 s.set(1)
00087 h1.unsubscribe()
00088 s.set(2)
00089 h2.unsubscribe()
00090 self.assertEqual(l, [
00091 (JustSubscribed, False),
00092 (JustSubscribed, False),
00093 (False, 1),
00094 (False, 1),
00095 (1, 2),
00096 ])
00097
00098 def test_composite(self):
00099 s1 = StatePublisher(0)
00100 s2 = StatePublisher(0)
00101 s = CompositeStatePublisher(sum, [s1, s2])
00102 l = []
00103 s.subscribe(state_logger, l)
00104 s1.set(1)
00105 s2.set(2)
00106 s1.set(5)
00107 self.assertEqual(l, [ (JustSubscribed, 0), (0, 1), (1, 3), (3, 7) ])
00108
00109 if len(sys.argv) > 1 and sys.argv[1].startswith("--gtest_output="):
00110 import roslib; roslib.load_manifest('multi_interface_roam')
00111 import rostest
00112 rostest.unitrun('multi_interface_roam', 'state_publisher_basic', BasicTest)
00113 else:
00114 unittest.main()