00001
00002
00003 import roslib; roslib.load_manifest('smach_viewer')
00004 import rospy
00005 import rostest
00006
00007 import unittest
00008
00009 from actionlib import *
00010 from actionlib.msg import *
00011
00012 from smach import *
00013
00014 from smach_msgs.msg import *
00015 from geometry_msgs.msg import PoseStamped
00016
00017
00018
00019 g1 = TestGoal(1)
00020 g2 = TestGoal(2)
00021 g3 = TestGoal(3)
00022
00023
00024 class Setter(State):
00025 """State that sets the key 'a' in its userdata"""
00026 def __init__(self):
00027 State.__init__(self,default_outcome='done')
00028 def enter(self):
00029 rospy.sleep(0.5)
00030 self.userdata.a = 'A'
00031 self.userdata.a_message = PoseStamped()
00032 rospy.loginfo("Added key 'a'.")
00033 rospy.sleep(0.5)
00034 return 'done'
00035
00036 class Getter(State):
00037 """State that grabs the key 'a' from userdata, and sets 'b'"""
00038 def __init__(self):
00039 State.__init__(self,default_outcome='done')
00040 def enter(self):
00041 rospy.sleep(0.5)
00042 while 'a' not in self.userdata:
00043
00044 rospy.sleep(0.1)
00045 self.userdata.b = self.userdata.a
00046 return 'done'
00047
00048
00049 class TestStateMachine(unittest.TestCase):
00050 def test_introspection(self):
00051 """Test introspection system."""
00052
00053 sm = StateMachine(['aborted','preempted'])
00054 sm2 = StateMachine(['done'])
00055 sm3 = StateMachine(['done'])
00056 sm4 = StateMachine(['done'])
00057 sm5 = StateMachine(['done'])
00058
00059
00060 con_split = ConcurrentSplit(default_outcome = 'succeeded')
00061 con_split.add(
00062 ('SETTER', Setter()),
00063 ('RADICAL',
00064 sm5.add(
00065 ('T6',SPAState(),
00066 { 'succeeded':'SETTER',
00067 'aborted':'T6',
00068 'preempted':'SETTER'}),
00069 ('SETTER',Setter(),
00070 { 'done':'done'}) ) ),
00071 ('GETTER', Getter())
00072 )
00073 con_split.add_outcome_map(({'SETTER':'done'},'succeeded'))
00074
00075
00076 sm.add(
00077 state_machine.sequence('done',
00078 ('GETTER1', Getter(), {}),
00079 ('S2',
00080 sm2.add(
00081 ('SETTER', Setter(), {'done':'A_SPLIT'}),
00082 ('A_SPLIT', con_split, {'succeeded':'done'}) ),
00083 {} ),
00084 ('S3',
00085 sm3.add(
00086 ('SETTER', Setter(), {'done':'RADICAL'}),
00087 ('RADICAL',
00088 sm4.add(
00089 ('T5',SPAState(),
00090 { 'succeeded':'SETTER',
00091 'aborted':'T5',
00092 'preempted':'SETTER'}),
00093 ('SETTER',Setter(),{'done':'done'})
00094 ),
00095 {'done':'SETTER2'} ),
00096 ('SETTER2', Setter(), {'done':'done'})
00097 ),
00098 {'done':'TRINARY!'} )
00099 )
00100 )
00101
00102 sm.add(('TRINARY!',SPAState(),{'succeeded':'T2','aborted':'T3','preempted':'T4'}))
00103 sm.add(
00104 state_machine.sequence('succeeded',
00105 ('T2',SPAState(),{}),
00106 ('T3',SPAState(),{'aborted':'S2'}),
00107 ('T4',SPAState(),{'succeeded':'GETTER2','aborted':'TRINARY!'})
00108 )
00109 )
00110
00111 sm.add(('GETTER2', Getter(), {'done':'GETTER1'}))
00112
00113
00114 sm.set_initial_state(['GETTER1'],smach.UserData())
00115 sm2.set_initial_state(['SETTER'],smach.UserData())
00116 sm3.set_initial_state(['SETTER'],smach.UserData())
00117 sm4.set_initial_state(['T5'],smach.UserData())
00118 sm5.set_initial_state(['T6'],smach.UserData())
00119
00120
00121 intro_server = smach.IntrospectionServer('intro_test',sm,'/intro_test')
00122
00123 intro_client = smach.IntrospectionClient()
00124 servers = intro_client.get_servers()
00125
00126 rospy.loginfo("Smach servers: "+str(servers))
00127 assert '/intro_test' in servers
00128
00129
00130 injected_ud = smach.UserData()
00131 injected_ud.a = 'A'
00132 init_set = intro_client.set_initial_state('/intro_test','/intro_test',['S2'],injected_ud,timeout = rospy.Duration(10.0))
00133
00134 assert init_set
00135
00136 sm.enter()
00137
00138 assert sm.get_outcome() == 'done'
00139
00140
00141
00142
00143 def main():
00144 rospy.init_node('introspection_test',log_level=rospy.DEBUG)
00145 rostest.rosrun('smach_viewer', 'state_machine_test', TestStateMachine)
00146
00147 if __name__=="__main__":
00148 main();