00001 import roslib; roslib.load_manifest('zyonz_apps_base')
00002 import rospy
00003 import smach
00004 import smach_ros
00005 from pprint import pprint
00006
00007 from smach import Iterator
00008 from geometry_msgs.msg import Pose, PoseStamped, Transform, TransformStamped
00009
00010 from zyonz_msgs.srv import GetProbingSteps
00011
00012 from iri_common_smach.st_get_pcl import GetPCL
00013 from iri_wam_smach.sm_wam_move_from_pose import SM_WAM_MoveFromPose
00014 from zyonz_apps_base.spadmeter import SM_ZYONZ_CloseGripper
00015 from zyonz_apps_base.move import SM_ZYONZ_GoHome
00016
00017 class ZyonzParseProbingMsg(smach.State):
00018 def __init__(self):
00019 smach.State.__init__(self, outcomes = ['finish'],
00020 input_keys = ['probing_item'],
00021 output_keys = ['pre_pose_st','probing_pose_st','post_pose_st'])
00022
00023 def execute(self, userdata):
00024 pre_pose_st = None
00025 if (len(userdata.probing_item.pre_probing.poses) > 0):
00026 pre_pose_st = PoseStamped()
00027 pre_pose_st.header = userdata.probing_item.pre_probing.header
00028 pre_pose_st.pose = userdata.probing_item.pre_probing.poses[0]
00029
00030 post_pose_st = None
00031 if (len(userdata.probing_item.post_probing.poses) > 0):
00032 post_pose_st = PoseStamped()
00033 post_pose_st.header = userdata.probing_item.post_probing.header
00034 post_pose_st.pose = userdata.probing_item.post_probing.poses[0]
00035
00036 pprint(pre_pose_st)
00037 pprint(post_pose_st)
00038 pprint("probing:")
00039 pprint(userdata.probing_item.probing)
00040
00041 userdata.pre_pose_st = pre_pose_st
00042 userdata.probing_pose_st = userdata.probing_item.probing
00043 userdata.post_pose_st = post_pose_st
00044
00045 return 'finish'
00046
00047 class ZyonzSelectProbingItem(smach.State):
00048 def __init__(self):
00049 smach.State.__init__(self, outcomes = ['finish','continue'],
00050 input_keys = ['probing_steps','index'],
00051 output_keys = ['probing_item'])
00052
00053 def execute(self, userdata):
00054 steps = userdata.probing_steps.steps
00055 if (userdata.index >= len(steps)):
00056 return 'finish'
00057
00058 userdata.probing_item = userdata.probing_steps.steps[userdata.index]
00059 return 'continue'
00060
00061 class ZyonzParseProbingMsg(smach.State):
00062 def __init__(self):
00063 smach.State.__init__(self, outcomes = ['finish'],
00064 input_keys = ['probing_item'],
00065 output_keys = ['pre_pose_st','probing_pose_st','post_pose_st'])
00066
00067 def execute(self, userdata):
00068 pre_pose_st = None
00069 if (len(userdata.probing_item.pre_probing.poses) > 0):
00070 pre_pose_st = PoseStamped()
00071 pre_pose_st.header = userdata.probing_item.pre_probing.header
00072 pre_pose_st.pose = userdata.probing_item.pre_probing.poses[0]
00073
00074 post_pose_st = None
00075 if (len(userdata.probing_item.post_probing.poses) > 0):
00076 post_pose_st = PoseStamped()
00077 post_pose_st.header = userdata.probing_item.post_probing.header
00078 post_pose_st.pose = userdata.probing_item.post_probing.poses[0]
00079
00080 pprint(pre_pose_st)
00081 pprint(post_pose_st)
00082 pprint("probing:")
00083 pprint(userdata.probing_item.probing)
00084
00085 userdata.pre_pose_st = pre_pose_st
00086 userdata.probing_pose_st = userdata.probing_item.probing
00087 userdata.post_pose_st = post_pose_st
00088
00089 return 'finish'
00090
00091 class ZyonzSelectProbingItem(smach.State):
00092 def __init__(self):
00093 smach.State.__init__(self, outcomes = ['finish','continue'],
00094 input_keys = ['probing_steps','index'],
00095 output_keys = ['probing_item'])
00096
00097 def execute(self, userdata):
00098 steps = userdata.probing_steps.steps
00099 if (userdata.index >= len(steps)):
00100 return 'finish'
00101
00102 userdata.probing_item = userdata.probing_steps.steps[userdata.index]
00103 return 'continue'
00104
00105 class ZyonzProbingSMFactory():
00106 def __init__(self):
00107 pass
00108
00109 def build_internal_sm(self):
00110 sm_approching_factory = SM_WAM_MoveFromPose('/zyonz/wam_wrapper/joints_move',
00111 '/zyonz/iri_wam_spad_ik/get_wam_ik')
00112 sm_approching = sm_approching_factory.build_sm()
00113 sm_spad_factory = SM_ZYONZ_CloseGripper()
00114 sm_spad = sm_spad_factory.build_sm()
00115
00116 sm = smach.StateMachine(outcomes = ['success','aborted','continue','no_kinematic_solution'],
00117 input_keys = ['probing_steps','index'])
00118
00119 with sm:
00120 smach.StateMachine.add('SELECT_PROBING_ITEM', ZyonzSelectProbingItem(),
00121 transitions = {'finish' : 'success',
00122 'continue' : 'PARSE_PROBING_MSG' },
00123 remapping = {'probing_item' : 'sm_probing_item'})
00124
00125 smach.StateMachine.add('PARSE_PROBING_MSG', ZyonzParseProbingMsg(),
00126 transitions = {'finish': 'PRE_PROBING_STEPS'},
00127 remapping = {'probing_item' : 'sm_probing_item'})
00128
00129
00130
00131 smach.StateMachine.add('PRE_PROBING_STEPS', sm_approching,
00132 transitions={'success': 'PROBING_POSE',
00133 'no_kinematic_solution': 'no_kinematic_solution',
00134 'aborted': 'aborted' },
00135 remapping = {'pose_st':'pre_pose_st'})
00136
00137
00138 smach.StateMachine.add('PROBING_POSE', sm_approching,
00139 transitions={'success': 'SPADMETER_ACTION',
00140 'no_kinematic_solution': 'no_kinematic_solution',
00141 'aborted': 'aborted' },
00142 remapping = {'pose_st':'probing_pose_st'})
00143
00144
00145
00146
00147 smach.StateMachine.add('SPADMETER_ACTION', sm_spad,
00148 transitions={'succeeded' : 'POST_PROBING_STEPS',
00149 'preempted' : 'aborted',
00150 'aborted' : 'aborted' })
00151
00152
00153
00154 smach.StateMachine.add('POST_PROBING_STEPS', sm_approching,
00155 transitions={'success': 'continue',
00156 'no_kinematic_solution': 'no_kinematic_solution',
00157 'aborted': 'aborted' },
00158 remapping = {'pose_st': 'post_pose_st'})
00159
00160 return sm
00161
00162 def build_iterator(self):
00163
00164
00165 iterator = Iterator(outcomes = ['success','aborted','no_kinematic_solution'],
00166 input_keys = ['probing_steps','rounds'],
00167 it = range(0, 10),
00168 output_keys = [],
00169 it_label = 'index',
00170 exhausted_outcome = 'success')
00171 with iterator:
00172 iterator.set_contained_state('RUN_PROBING_ROUND',
00173 self.build_internal_sm(),
00174 loop_outcomes=['continue'])
00175 return iterator
00176
00177 def build_sm(self):
00178
00179 sm = smach.StateMachine(outcomes = ['succeeded','aborted','no_kinematic_solution'],
00180 input_keys = ['probing_steps'])
00181
00182
00183 sm.userdata.rounds = 10
00184 sm_iterator = self.build_iterator()
00185
00186 with sm:
00187 smach.StateMachine.add('PROBING_ITERATOR', sm_iterator,
00188 transitions = {'success' : 'succeeded'})
00189
00190 return sm
00191
00192 class FindProbingPoint(smach.State):
00193 def __init__(self):
00194 smach.State.__init__(self, outcomes = ['success','fail'],
00195 input_keys = ['pcl'],
00196 output_keys = ['probing_steps'])
00197 def execute(self, userdata):
00198 handler = rospy.ServiceProxy('/zyonz/skills/image_based_leaf_probing/leaf_probing_srv', GetProbingSteps)
00199
00200 try:
00201 response = handler(userdata.pcl)
00202 except rospy.ServiceException, e:
00203 return 'fail'
00204
00205 if (not response):
00206 return 'fail'
00207
00208 pub = rospy.Publisher('/debug/probing/grasp', PoseStamped, None, False, True)
00209 pub.publish(response.steps[0].probing)
00210 pprint(response.steps[0].probing)
00211
00212 pre_pose_st = PoseStamped()
00213 pre_pose_st.header = response.steps[0].pre_probing.header
00214 pre_pose_st.pose = response.steps[0].pre_probing.poses[0]
00215 pub = rospy.Publisher('/debug/probing/pre_grasp', PoseStamped, None, False, True)
00216 pub.publish(pre_pose_st)
00217 pprint(pre_pose_st)
00218
00219
00220 userdata.probing_steps = response
00221
00222
00223 return 'success'
00224
00225 class SM_ZYONZ_FullProbing():
00226
00227
00228
00229
00230
00231
00232 def __init__(self):
00233 pass
00234
00235 def build_sm(self):
00236 factory = ZyonzProbingSMFactory()
00237 sm_probing = factory.build_sm()
00238 factory = SM_ZYONZ_GoHome()
00239 sm_goHome = factory.build_sm()
00240
00241 sm = smach.StateMachine(outcomes = ['succeeded','aborted','no_kinematic_solution'])
00242
00243 with sm:
00244 smach.StateMachine.add('GO_HOME', sm_goHome,
00245 transitions={'success': 'CAPTURE_PCL',
00246 'aborted' : 'aborted'})
00247
00248 smach.StateMachine.add('CAPTURE_PCL',
00249 GetPCL("/zyonz/apply_tof_calibration/output"),
00250 transitions = {'success' : 'FIND_PROBING_POINT',
00251 'fail' : 'aborted'},
00252 remapping = {'pcl_RGB' : 'sm_pcl'})
00253
00254 smach.StateMachine.add('FIND_PROBING_POINT', FindProbingPoint(),
00255 transitions = {'success' : 'PROBING_ACTION',
00256 'fail' : 'FIND_PROBING_POINT'},
00257 remapping = {'pcl' : 'sm_pcl' ,
00258 'probing_steps' : 'sm_probing_msg' })
00259
00260 smach.StateMachine.add('PROBING_ACTION', sm_probing,
00261 transitions = {'succeeded' : 'GO_END',
00262 'no_kinematic_solution' : 'no_kinematic_solution',
00263 'aborted' : 'aborted'},
00264 remapping = {'probing_steps' : 'sm_probing_msg'})
00265
00266 smach.StateMachine.add('GO_END', sm_goHome,
00267 transitions={'success': 'succeeded',
00268 'aborted' : 'aborted'})
00269
00270 return sm