state_publisher.py
Go to the documentation of this file.
00001 #! /usr/bin/env python
00002 
00003 from __future__ import with_statement
00004 
00005 import event
00006 
00007 # TODO
00008 # - Make CompositeStatePublisher so it can be garbage collected.
00009 
00010 class StatePublisher():
00011     def __init__(self, init_state):
00012         self._set_lock = event.ReentrantDetectingLock()
00013         self._state = init_state
00014         self._event = event.Event()
00015 
00016     def get(self):
00017         return self._state
00018 
00019     def _set_nolock(self, new_state):
00020         if self._state != new_state:
00021             old_state = self._state
00022             self._state = new_state
00023             self._event.trigger(old_state = old_state, new_state = new_state)
00024             self._setting_thread = None
00025 
00026     def set(self, new_state):
00027         with self._set_lock("Tried to set state from state callback."):
00028             self._set_nolock(new_state)
00029 
00030     def subscribe(self, *args, **kwargs):
00031         with self._set_lock("Tried to subscribe from state callback."):
00032             h = self._event.subscribe_repeating(*args, **kwargs)
00033             h._trigger((), {'old_state' : None, 'new_state' : self._state})
00034         return h
00035 
00036 class CompositeStatePublisher(StatePublisher):
00037     def __init__(self, func, state_publishers):
00038         StatePublisher.__init__(self, None)
00039         n = len(state_publishers)
00040         self._func = func
00041         self._states = n * [ None ]
00042         self._has_been_set = n * [ False ]
00043         for i in range(0, n):
00044            state_publishers[i].subscribe(self._cb, i) 
00045     
00046     def _cb(self, idx, old_state, new_state):
00047         if self._states[idx] != old_state:
00048             raise Exception("Unexpected state: %s != %s"%(self._states[idx], old_state))
00049         self._states[idx] = new_state
00050         with self._set_lock:
00051             all_set = all(self._has_been_set)
00052             if not all_set:
00053                 self._has_been_set[idx] = True
00054                 if all(self._has_been_set):
00055                     self._has_been_set = [] # Silly optimization
00056                     all_set = True
00057             if all_set:
00058                 self._set_nolock(self._func(self._states))
00059 
00060     def set(self, new_state):
00061         raise Exception("set method called on CompositeStatePublisher")
00062 
00063 if __name__ == "__main__":
00064     import unittest
00065     import sys
00066     
00067     def state_logger(l, old_state, new_state):
00068         l.append((old_state, new_state))
00069 
00070     class BasicTest(unittest.TestCase):
00071         def test_basic(self):
00072             s = StatePublisher(False)
00073             l = []
00074             h1 = s.subscribe(state_logger, l)
00075             h2 = s.subscribe(state_logger, l)
00076             s.set(1)
00077             h1.unsubscribe()
00078             s.set(2)
00079             h2.unsubscribe()
00080             self.assertEqual(l, [
00081                 (None, False),
00082                 (None, False),
00083                 (False, 1),
00084                 (False, 1),
00085                 (1, 2),
00086                 ])
00087 
00088         def test_composite(self):
00089             s1 = StatePublisher(0)
00090             s2 = StatePublisher(0)
00091             s = CompositeStatePublisher(sum, [s1, s2])
00092             l = []
00093             s.subscribe(state_logger, l)
00094             s1.set(1)
00095             s2.set(2)
00096             s1.set(5)
00097             self.assertEqual(l, [ (None, 0), (0, 1), (1, 3), (3, 7) ])
00098 
00099     if len(sys.argv) > 1 and sys.argv[1].startswith("--gtest_output="):
00100         import roslib; roslib.load_manifest('multi_interface_roam')
00101         import rostest
00102         rostest.unitrun('multi_interface_roam', 'state_publisher_basic', BasicTest)
00103     else:
00104         unittest.main()


multi_interface_roam
Author(s): Blaise Gassend
autogenerated on Wed Sep 16 2015 04:38:27