$search
00001 #!/usr/bin/env python 00002 00003 import roslib; roslib.load_manifest('smach_viewer') 00004 import rospy 00005 import rostest 00006 00007 import unittest 00008 00009 from actionlib import * 00010 from actionlib.msg import * 00011 00012 from smach import * 00013 00014 from smach_msgs.msg import * 00015 from geometry_msgs.msg import PoseStamped 00016 00017 00018 # Static goals 00019 g1 = TestGoal(1) # This goal should succeed 00020 g2 = TestGoal(2) # This goal should abort 00021 g3 = TestGoal(3) # This goal should be rejected 00022 00023 ### Custom tate classe 00024 class Setter(State): 00025 """State that sets the key 'a' in its userdata""" 00026 def __init__(self): 00027 State.__init__(self,default_outcome='done') 00028 def enter(self): 00029 rospy.sleep(0.5) 00030 self.userdata.a = 'A' 00031 self.userdata.a_message = PoseStamped() 00032 rospy.loginfo("Added key 'a'.") 00033 rospy.sleep(0.5) 00034 return 'done' 00035 00036 class Getter(State): 00037 """State that grabs the key 'a' from userdata, and sets 'b'""" 00038 def __init__(self): 00039 State.__init__(self,default_outcome='done') 00040 def enter(self): 00041 rospy.sleep(0.5) 00042 while 'a' not in self.userdata: 00043 #rospy.loginfo("Waiting for key 'a' to appear. ") 00044 rospy.sleep(0.1) 00045 self.userdata.b = self.userdata.a 00046 return 'done' 00047 00048 ### Test harness 00049 class TestStateMachine(unittest.TestCase): 00050 def test_introspection(self): 00051 """Test introspection system.""" 00052 # Construct state machine 00053 sm = StateMachine(['aborted','preempted']) 00054 sm2 = StateMachine(['done']) 00055 sm3 = StateMachine(['done']) 00056 sm4 = StateMachine(['done']) 00057 sm5 = StateMachine(['done']) 00058 00059 00060 con_split = ConcurrentSplit(default_outcome = 'succeeded') 00061 con_split.add( 00062 ('SETTER', Setter()), 00063 ('RADICAL', 00064 sm5.add( 00065 ('T6',SPAState(), 00066 { 'succeeded':'SETTER', 00067 'aborted':'T6', 00068 'preempted':'SETTER'}), 00069 ('SETTER',Setter(), 00070 { 'done':'done'}) ) ), 00071 ('GETTER', Getter()) 00072 ) 00073 con_split.add_outcome_map(({'SETTER':'done'},'succeeded')) 00074 00075 00076 sm.add( 00077 state_machine.sequence('done', 00078 ('GETTER1', Getter(), {}), 00079 ('S2', 00080 sm2.add( 00081 ('SETTER', Setter(), {'done':'A_SPLIT'}), 00082 ('A_SPLIT', con_split, {'succeeded':'done'}) ), 00083 {} ), 00084 ('S3', 00085 sm3.add( 00086 ('SETTER', Setter(), {'done':'RADICAL'}), 00087 ('RADICAL', 00088 sm4.add( 00089 ('T5',SPAState(), 00090 { 'succeeded':'SETTER', 00091 'aborted':'T5', 00092 'preempted':'SETTER'}), 00093 ('SETTER',Setter(),{'done':'done'}) 00094 ), 00095 {'done':'SETTER2'} ), 00096 ('SETTER2', Setter(), {'done':'done'}) 00097 ), 00098 {'done':'TRINARY!'} ) 00099 ) 00100 ) 00101 00102 sm.add(('TRINARY!',SPAState(),{'succeeded':'T2','aborted':'T3','preempted':'T4'})) 00103 sm.add( 00104 state_machine.sequence('succeeded', 00105 ('T2',SPAState(),{}), 00106 ('T3',SPAState(),{'aborted':'S2'}), 00107 ('T4',SPAState(),{'succeeded':'GETTER2','aborted':'TRINARY!'}) 00108 ) 00109 ) 00110 00111 sm.add(('GETTER2', Getter(), {'done':'GETTER1'})) 00112 00113 # Set default initial states 00114 sm.set_initial_state(['GETTER1'],smach.UserData()) 00115 sm2.set_initial_state(['SETTER'],smach.UserData()) 00116 sm3.set_initial_state(['SETTER'],smach.UserData()) 00117 sm4.set_initial_state(['T5'],smach.UserData()) 00118 sm5.set_initial_state(['T6'],smach.UserData()) 00119 00120 # Run introspector 00121 intro_server = smach.IntrospectionServer('intro_test',sm,'/intro_test') 00122 00123 intro_client = smach.IntrospectionClient() 00124 servers = intro_client.get_servers() 00125 00126 rospy.loginfo("Smach servers: "+str(servers)) 00127 assert '/intro_test' in servers 00128 00129 # Set initial state 00130 injected_ud = smach.UserData() 00131 injected_ud.a = 'A' 00132 init_set = intro_client.set_initial_state('/intro_test','/intro_test',['S2'],injected_ud,timeout = rospy.Duration(10.0)) 00133 00134 assert init_set 00135 00136 sm.enter() 00137 00138 assert sm.get_outcome() == 'done' 00139 00140 00141 00142 00143 def main(): 00144 rospy.init_node('introspection_test',log_level=rospy.DEBUG) 00145 rostest.rosrun('smach_viewer', 'state_machine_test', TestStateMachine) 00146 00147 if __name__=="__main__": 00148 main();