state_publisher.py
Go to the documentation of this file.
00001 #! /usr/bin/env python 
00002 
00003 from __future__ import with_statement
00004 
00005 import event
00006 import copy
00007 
00008 # TODO
00009 # - Make CompositeStatePublisher so it can be garbage collected.
00010 # - Add test for Unsubscribe exception, in particular case whene it is
00011 # raised directly from subscribe.
00012 # - Add test for CompositeStatePublisher.
00013 # - Add test that mutable states get copied when stored into old_state.
00014 
00015 class JustSubscribed:
00016     pass
00017 
00018 class StatePublisher():
00019     def __init__(self, init_state):
00020         self._state = init_state
00021         self._event = event.Event()
00022 
00023     def get(self):
00024         return self._state
00025 
00026     def set(self, new_state):
00027         if self._state != new_state:
00028             old_state = self._state
00029             self._state = copy.deepcopy(new_state)
00030             self._event.trigger(old_state = old_state, new_state = new_state)
00031             self._setting_thread = None
00032 
00033     def subscribe(self, *args, **kwargs):
00034         f = args[0]
00035         args = args[1:]
00036         h = self._event.subscribe_repeating(f, *args, **kwargs)
00037         allkwargs = {'old_state' : JustSubscribed, 'new_state' : self._state}
00038         allkwargs.update(kwargs)
00039         try:
00040             f(*args, **allkwargs)
00041         except event.Unsubscribe:
00042             h.unsubscribe()
00043         return h
00044 
00045 class CompositeStatePublisher(StatePublisher):
00046     def __init__(self, func, state_publishers):
00047         StatePublisher.__init__(self, None)
00048         n = len(state_publishers)
00049         self._func = func
00050         self._states = n * [ JustSubscribed ]
00051         self._has_been_set = n * [ False ]
00052         for i in range(0, n):
00053             state_publishers[i].subscribe(self._cb, i) 
00054    
00055     _set = StatePublisher.set
00056 
00057     def _cb(self, idx, old_state, new_state):
00058         if self._states[idx] != old_state:
00059             raise Exception("Unexpected state: %s != %s"%(self._states[idx], old_state))
00060         self._states[idx] = new_state
00061         all_set = all(self._has_been_set)
00062         if not all_set:
00063             self._has_been_set[idx] = True
00064             if all(self._has_been_set):
00065                 self._has_been_set = [] # Silly optimization
00066                 all_set = True
00067         if all_set:
00068             self._set(self._func(self._states))
00069 
00070     def set(self, new_state):
00071         raise Exception("set method called on CompositeStatePublisher")
00072 
00073 if __name__ == "__main__":
00074     import unittest
00075     import sys
00076     
00077     def state_logger(l, old_state, new_state):
00078         l.append((old_state, new_state))
00079 
00080     class BasicTest(unittest.TestCase):
00081         def test_basic(self):
00082             s = StatePublisher(False)
00083             l = []
00084             h1 = s.subscribe(state_logger, l)
00085             h2 = s.subscribe(state_logger, l)
00086             s.set(1)
00087             h1.unsubscribe()
00088             s.set(2)
00089             h2.unsubscribe()
00090             self.assertEqual(l, [
00091                 (JustSubscribed, False),
00092                 (JustSubscribed, False),
00093                 (False, 1),
00094                 (False, 1),
00095                 (1, 2),
00096                 ])
00097 
00098         def test_composite(self):
00099             s1 = StatePublisher(0)
00100             s2 = StatePublisher(0)
00101             s = CompositeStatePublisher(sum, [s1, s2])
00102             l = []
00103             s.subscribe(state_logger, l)
00104             s1.set(1)
00105             s2.set(2)
00106             s1.set(5)
00107             self.assertEqual(l, [ (JustSubscribed, 0), (0, 1), (1, 3), (3, 7) ])
00108 
00109     if len(sys.argv) > 1 and sys.argv[1].startswith("--gtest_output="):
00110         import roslib; roslib.load_manifest('multi_interface_roam')
00111         import rostest
00112         rostest.unitrun('multi_interface_roam', 'state_publisher_basic', BasicTest)
00113     else:
00114         unittest.main()


multi_interface_roam
Author(s): Blaise Gassend
autogenerated on Wed Sep 16 2015 04:38:27