probing.py
Go to the documentation of this file.
00001 import roslib; roslib.load_manifest('zyonz_apps_base')
00002 import rospy
00003 import smach
00004 import smach_ros
00005 from pprint import pprint
00006 
00007 from smach             import Iterator
00008 from geometry_msgs.msg import Pose, PoseStamped, Transform, TransformStamped
00009 
00010 from zyonz_msgs.srv import GetProbingSteps
00011 
00012 from iri_common_smach.st_get_pcl         import GetPCL
00013 from iri_wam_smach.sm_wam_move_from_pose import SM_WAM_MoveFromPose
00014 from zyonz_apps_base.spadmeter           import SM_ZYONZ_CloseGripper
00015 from zyonz_apps_base.move                import SM_ZYONZ_GoHome
00016 
00017 class ZyonzParseProbingMsg(smach.State):
00018     def __init__(self):
00019         smach.State.__init__(self, outcomes    = ['finish'],
00020                                    input_keys  = ['probing_item'],
00021                                    output_keys = ['pre_pose_st','probing_pose_st','post_pose_st'])
00022 
00023     def execute(self, userdata):
00024         pre_pose_st = None
00025         if (len(userdata.probing_item.pre_probing.poses) > 0):
00026             pre_pose_st          = PoseStamped()
00027             pre_pose_st.header   = userdata.probing_item.pre_probing.header
00028             pre_pose_st.pose     = userdata.probing_item.pre_probing.poses[0]
00029 
00030         post_pose_st = None
00031         if (len(userdata.probing_item.post_probing.poses) > 0):
00032             post_pose_st = PoseStamped()
00033             post_pose_st.header   = userdata.probing_item.post_probing.header
00034             post_pose_st.pose     = userdata.probing_item.post_probing.poses[0]
00035 
00036         pprint(pre_pose_st)
00037         pprint(post_pose_st)
00038         pprint("probing:")
00039         pprint(userdata.probing_item.probing)
00040 
00041         userdata.pre_pose_st = pre_pose_st
00042         userdata.probing_pose_st = userdata.probing_item.probing
00043         userdata.post_pose_st = post_pose_st
00044 
00045         return 'finish'
00046 
00047 class ZyonzSelectProbingItem(smach.State):
00048     def __init__(self):
00049         smach.State.__init__(self, outcomes    = ['finish','continue'],
00050                                    input_keys  = ['probing_steps','index'],
00051                                    output_keys = ['probing_item'])
00052 
00053     def execute(self, userdata):
00054         steps = userdata.probing_steps.steps
00055         if (userdata.index >= len(steps)):
00056             return 'finish'
00057 
00058         userdata.probing_item = userdata.probing_steps.steps[userdata.index]
00059         return 'continue'
00060 
00061 class ZyonzParseProbingMsg(smach.State):
00062     def __init__(self):
00063         smach.State.__init__(self, outcomes    = ['finish'],
00064                                    input_keys  = ['probing_item'],
00065                                    output_keys = ['pre_pose_st','probing_pose_st','post_pose_st'])
00066 
00067     def execute(self, userdata):
00068         pre_pose_st = None
00069         if (len(userdata.probing_item.pre_probing.poses) > 0):
00070             pre_pose_st          = PoseStamped()
00071             pre_pose_st.header   = userdata.probing_item.pre_probing.header
00072             pre_pose_st.pose     = userdata.probing_item.pre_probing.poses[0]
00073 
00074         post_pose_st = None
00075         if (len(userdata.probing_item.post_probing.poses) > 0):
00076             post_pose_st = PoseStamped()
00077             post_pose_st.header   = userdata.probing_item.post_probing.header
00078             post_pose_st.pose     = userdata.probing_item.post_probing.poses[0]
00079 
00080         pprint(pre_pose_st)
00081         pprint(post_pose_st)
00082         pprint("probing:")
00083         pprint(userdata.probing_item.probing)
00084 
00085         userdata.pre_pose_st = pre_pose_st
00086         userdata.probing_pose_st = userdata.probing_item.probing
00087         userdata.post_pose_st = post_pose_st
00088 
00089         return 'finish'
00090 
00091 class ZyonzSelectProbingItem(smach.State):
00092     def __init__(self):
00093         smach.State.__init__(self, outcomes    = ['finish','continue'],
00094                                    input_keys  = ['probing_steps','index'],
00095                                    output_keys = ['probing_item'])
00096 
00097     def execute(self, userdata):
00098         steps = userdata.probing_steps.steps
00099         if (userdata.index >= len(steps)):
00100             return 'finish'
00101 
00102         userdata.probing_item = userdata.probing_steps.steps[userdata.index]
00103         return 'continue'
00104 
00105 class ZyonzProbingSMFactory():
00106     def __init__(self):
00107         pass
00108 
00109     def build_internal_sm(self):
00110         sm_approching_factory = SM_WAM_MoveFromPose('/zyonz/wam_wrapper/joints_move',
00111                                                     '/zyonz/iri_wam_spad_ik/get_wam_ik')
00112         sm_approching         = sm_approching_factory.build_sm()
00113         sm_spad_factory       = SM_ZYONZ_CloseGripper()
00114         sm_spad               = sm_spad_factory.build_sm()
00115 
00116         sm = smach.StateMachine(outcomes   = ['success','aborted','continue','no_kinematic_solution'],
00117                                 input_keys = ['probing_steps','index'])
00118 
00119         with sm:
00120            smach.StateMachine.add('SELECT_PROBING_ITEM', ZyonzSelectProbingItem(),
00121                           transitions = {'finish'   : 'success',
00122                                          'continue' : 'PARSE_PROBING_MSG' },
00123                           remapping   = {'probing_item' : 'sm_probing_item'})
00124 
00125            smach.StateMachine.add('PARSE_PROBING_MSG', ZyonzParseProbingMsg(),
00126                           transitions = {'finish': 'PRE_PROBING_STEPS'},
00127                           remapping   = {'probing_item' : 'sm_probing_item'})
00128 
00129            # Approaching steps
00130            # TODO: This should be an iterator over number of approaching steps
00131            smach.StateMachine.add('PRE_PROBING_STEPS', sm_approching,
00132                            transitions={'success': 'PROBING_POSE',
00133                                         'no_kinematic_solution': 'no_kinematic_solution',
00134                                         'aborted': 'aborted' },
00135                            remapping = {'pose_st':'pre_pose_st'})
00136 
00137            # Probing pose
00138            smach.StateMachine.add('PROBING_POSE', sm_approching,
00139                            transitions={'success': 'SPADMETER_ACTION',
00140                                         'no_kinematic_solution': 'no_kinematic_solution',
00141                                         'aborted': 'aborted' },
00142                            remapping = {'pose_st':'probing_pose_st'})
00143 
00144            # Spadmeter action
00145            # TODO: this machine be converted to some type of parameter so probing does not
00146            # imply using the spad all times.
00147            smach.StateMachine.add('SPADMETER_ACTION', sm_spad,
00148                            transitions={'succeeded' : 'POST_PROBING_STEPS',
00149                                         'preempted' : 'aborted',
00150                                         'aborted'   : 'aborted' })
00151 
00152            # Exiting steps
00153            # TODO: This should be an iterator over number of exiting steps
00154            smach.StateMachine.add('POST_PROBING_STEPS', sm_approching,
00155                            transitions={'success': 'continue',
00156                                         'no_kinematic_solution': 'no_kinematic_solution',
00157                                         'aborted': 'aborted' },
00158                            remapping = {'pose_st': 'post_pose_st'})
00159 
00160         return sm
00161 
00162     def build_iterator(self):
00163         # Iterator allows maximum of 10 rounds. It should use the size of probing_steps
00164         # but seems impossible to pass this as input.
00165         iterator = Iterator(outcomes = ['success','aborted','no_kinematic_solution'],
00166                             input_keys = ['probing_steps','rounds'],
00167                             it = range(0, 10), 
00168                             output_keys = [],
00169                             it_label = 'index',
00170                             exhausted_outcome = 'success')
00171         with iterator:
00172             iterator.set_contained_state('RUN_PROBING_ROUND',
00173                                          self.build_internal_sm(),
00174                                          loop_outcomes=['continue'])
00175         return iterator
00176 
00177     def build_sm(self):
00178 
00179         sm = smach.StateMachine(outcomes   = ['succeeded','aborted','no_kinematic_solution'],
00180                                 input_keys = ['probing_steps'])
00181 
00182         # Dirty as hell until SMACH allows to use input keys as iterator items
00183         sm.userdata.rounds = 10
00184         sm_iterator = self.build_iterator()
00185 
00186         with sm:
00187             smach.StateMachine.add('PROBING_ITERATOR', sm_iterator,
00188                     transitions = {'success' : 'succeeded'})
00189 
00190         return sm
00191 
00192 class FindProbingPoint(smach.State):
00193     def __init__(self):
00194         smach.State.__init__(self, outcomes    = ['success','fail'],
00195                                    input_keys  = ['pcl'],
00196                                    output_keys = ['probing_steps'])
00197     def execute(self, userdata):
00198         handler = rospy.ServiceProxy('/zyonz/skills/image_based_leaf_probing/leaf_probing_srv', GetProbingSteps)
00199 
00200         try:
00201             response = handler(userdata.pcl)
00202         except rospy.ServiceException, e:
00203             return 'fail'
00204 
00205         if (not response):
00206             return 'fail'
00207 
00208         pub = rospy.Publisher('/debug/probing/grasp', PoseStamped, None, False, True)
00209         pub.publish(response.steps[0].probing)
00210         pprint(response.steps[0].probing)
00211 
00212         pre_pose_st = PoseStamped()
00213         pre_pose_st.header = response.steps[0].pre_probing.header
00214         pre_pose_st.pose   = response.steps[0].pre_probing.poses[0]
00215         pub = rospy.Publisher('/debug/probing/pre_grasp', PoseStamped, None, False, True)
00216         pub.publish(pre_pose_st)
00217         pprint(pre_pose_st)
00218 
00219         #userdata.probing_steps = response.steps[0]
00220         userdata.probing_steps = response
00221 
00222  #        return 'fail'
00223         return 'success'
00224 
00225 class SM_ZYONZ_FullProbing():
00226     # Build a whole SMACH machine that will do:
00227     # 1. Start in home position
00228     # 2. Capture a PCL from the cam
00229     # 3. Find the probing point
00230     # 4. Perform the Probing action
00231     # 5. Back to home again
00232     def __init__(self):
00233         pass
00234 
00235     def build_sm(self):
00236         factory    = ZyonzProbingSMFactory()
00237         sm_probing = factory.build_sm()
00238         factory    = SM_ZYONZ_GoHome()
00239         sm_goHome  = factory.build_sm()
00240 
00241         sm = smach.StateMachine(outcomes = ['succeeded','aborted','no_kinematic_solution'])
00242 
00243         with sm:
00244             smach.StateMachine.add('GO_HOME', sm_goHome,
00245                               transitions={'success': 'CAPTURE_PCL',
00246                                            'aborted'   : 'aborted'})
00247 
00248             smach.StateMachine.add('CAPTURE_PCL',
00249                              GetPCL("/zyonz/apply_tof_calibration/output"),
00250                                 transitions = {'success' : 'FIND_PROBING_POINT',
00251                                                'fail'    : 'aborted'},
00252                                 remapping   = {'pcl_RGB'  : 'sm_pcl'})
00253 
00254             smach.StateMachine.add('FIND_PROBING_POINT', FindProbingPoint(),
00255                     transitions = {'success' : 'PROBING_ACTION',
00256                                    'fail'    : 'FIND_PROBING_POINT'},
00257                     remapping   = {'pcl'           : 'sm_pcl' ,
00258                                    'probing_steps' : 'sm_probing_msg' })
00259 
00260             smach.StateMachine.add('PROBING_ACTION', sm_probing,
00261                               transitions  = {'succeeded'     : 'GO_END',
00262                                               'no_kinematic_solution' : 'no_kinematic_solution',
00263                                               'aborted' : 'aborted'},
00264                               remapping    = {'probing_steps' : 'sm_probing_msg'})
00265 
00266             smach.StateMachine.add('GO_END', sm_goHome,
00267                               transitions={'success': 'succeeded',
00268                                            'aborted'   : 'aborted'})
00269 
00270         return sm


zyonz_apps_base
Author(s): Jose Luis Rivero
autogenerated on Fri Dec 6 2013 22:53:18