introspection_test.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 
00003 import roslib; roslib.load_manifest('smach_viewer')
00004 import rospy
00005 import rostest
00006 
00007 import unittest
00008 
00009 from actionlib import *
00010 from actionlib.msg import *
00011 
00012 from smach import *
00013 
00014 from smach_msgs.msg import *
00015 from geometry_msgs.msg import PoseStamped
00016 
00017 
00018 # Static goals
00019 g1 = TestGoal(1) # This goal should succeed
00020 g2 = TestGoal(2) # This goal should abort
00021 g3 = TestGoal(3) # This goal should be rejected
00022 
00023 ### Custom tate classe
00024 class Setter(State):
00025   """State that sets the key 'a' in its userdata"""
00026   def __init__(self):
00027     State.__init__(self,default_outcome='done')
00028   def enter(self):
00029     rospy.sleep(0.5)
00030     self.userdata.a = 'A'
00031     self.userdata.a_message = PoseStamped()
00032     rospy.loginfo("Added key 'a'.")
00033     rospy.sleep(0.5)
00034     return 'done'
00035 
00036 class Getter(State):
00037   """State that grabs the key 'a' from userdata, and sets 'b'"""
00038   def __init__(self):
00039     State.__init__(self,default_outcome='done')
00040   def enter(self):
00041     rospy.sleep(0.5)
00042     while 'a' not in self.userdata:
00043       #rospy.loginfo("Waiting for key 'a' to appear. ")
00044       rospy.sleep(0.1)
00045     self.userdata.b = self.userdata.a
00046     return 'done'
00047 
00048 ### Test harness
00049 class TestStateMachine(unittest.TestCase):
00050   def test_introspection(self):
00051     """Test introspection system."""
00052     # Construct state machine
00053     sm = StateMachine(['aborted','preempted'])
00054     sm2 = StateMachine(['done'])
00055     sm3 = StateMachine(['done'])
00056     sm4 = StateMachine(['done'])
00057     sm5 = StateMachine(['done'])
00058 
00059 
00060     con_split = ConcurrentSplit(default_outcome = 'succeeded')
00061     con_split.add(
00062         ('SETTER', Setter()),
00063         ('RADICAL',
00064           sm5.add(
00065             ('T6',SPAState(),
00066               { 'succeeded':'SETTER',
00067                 'aborted':'T6',
00068                 'preempted':'SETTER'}),
00069             ('SETTER',Setter(),
00070               { 'done':'done'}) ) ),
00071         ('GETTER', Getter())
00072         )
00073     con_split.add_outcome_map(({'SETTER':'done'},'succeeded'))
00074 
00075 
00076     sm.add(
00077         state_machine.sequence('done',
00078           ('GETTER1', Getter(), {}),
00079           ('S2',
00080             sm2.add(
00081               ('SETTER', Setter(), {'done':'A_SPLIT'}),
00082               ('A_SPLIT', con_split, {'succeeded':'done'}) ),
00083             {} ),
00084           ('S3',
00085             sm3.add(
00086               ('SETTER', Setter(), {'done':'RADICAL'}),
00087               ('RADICAL',
00088                 sm4.add(
00089                   ('T5',SPAState(),
00090                     { 'succeeded':'SETTER',
00091                       'aborted':'T5',
00092                       'preempted':'SETTER'}),
00093                     ('SETTER',Setter(),{'done':'done'})
00094                   ),
00095                 {'done':'SETTER2'} ),
00096               ('SETTER2', Setter(), {'done':'done'})
00097               ),
00098             {'done':'TRINARY!'} )
00099           )
00100         )
00101 
00102     sm.add(('TRINARY!',SPAState(),{'succeeded':'T2','aborted':'T3','preempted':'T4'}))
00103     sm.add(
00104         state_machine.sequence('succeeded',
00105           ('T2',SPAState(),{}),
00106           ('T3',SPAState(),{'aborted':'S2'}),
00107           ('T4',SPAState(),{'succeeded':'GETTER2','aborted':'TRINARY!'})
00108           )
00109         )
00110 
00111     sm.add(('GETTER2', Getter(), {'done':'GETTER1'}))
00112 
00113     # Set default initial states
00114     sm.set_initial_state(['GETTER1'],smach.UserData())
00115     sm2.set_initial_state(['SETTER'],smach.UserData())
00116     sm3.set_initial_state(['SETTER'],smach.UserData())
00117     sm4.set_initial_state(['T5'],smach.UserData())
00118     sm5.set_initial_state(['T6'],smach.UserData())
00119 
00120     # Run introspector
00121     intro_server = smach.IntrospectionServer('intro_test',sm,'/intro_test')
00122 
00123     intro_client = smach.IntrospectionClient()
00124     servers = intro_client.get_servers()
00125 
00126     rospy.loginfo("Smach servers: "+str(servers))
00127     assert '/intro_test' in servers
00128 
00129     # Set initial state
00130     injected_ud = smach.UserData()
00131     injected_ud.a = 'A'
00132     init_set = intro_client.set_initial_state('/intro_test','/intro_test',['S2'],injected_ud,timeout = rospy.Duration(10.0))
00133 
00134     assert init_set
00135 
00136     sm.enter()
00137 
00138     assert sm.get_outcome() == 'done'
00139 
00140 
00141 
00142 
00143 def main():
00144   rospy.init_node('introspection_test',log_level=rospy.DEBUG)
00145   rostest.rosrun('smach_viewer', 'state_machine_test', TestStateMachine)
00146 
00147 if __name__=="__main__":
00148   main();


smach_viewer
Author(s): Jonathan Bohren
autogenerated on Sun Oct 5 2014 23:55:52